send_msg.go 11.3 KB
Newer Older
programor_guo's avatar
programor_guo 已提交
1
package msg
H
hailong 已提交
2 3

import (
4 5
	"Open_IM/pkg/common/config"
	"Open_IM/pkg/common/constant"
6
	"Open_IM/pkg/common/db"
7 8 9
	http2 "Open_IM/pkg/common/http"
	"Open_IM/pkg/common/log"
	"Open_IM/pkg/grpc-etcdv3/getcdv3"
10 11
	pbChat "Open_IM/pkg/proto/chat"
	pbGroup "Open_IM/pkg/proto/group"
programor_guo's avatar
programor_guo 已提交
12
	sdk_ws "Open_IM/pkg/proto/sdk_ws"
13
	"Open_IM/pkg/utils"
H
hailong 已提交
14
	"context"
落凡尘.'s avatar
落凡尘. 已提交
15
	"encoding/json"
programor_guo's avatar
programor_guo 已提交
16
	"github.com/garyburd/redigo/redis"
H
hailong 已提交
17
	"math/rand"
落凡尘.'s avatar
落凡尘. 已提交
18
	"net/http"
H
hailong 已提交
19
	"strconv"
落凡尘.'s avatar
落凡尘. 已提交
20
	"strings"
H
hailong 已提交
21 22 23
	"time"
)

落凡尘.'s avatar
落凡尘. 已提交
24
type MsgCallBackReq struct {
25 26 27 28 29 30 31 32 33 34
	SendID       string `json:"sendID"`
	RecvID       string `json:"recvID"`
	Content      string `json:"content"`
	SendTime     int64  `json:"sendTime"`
	MsgFrom      int32  `json:"msgFrom"`
	ContentType  int32  `json:"contentType"`
	SessionType  int32  `json:"sessionType"`
	PlatformID   int32  `json:"senderPlatformID"`
	MsgID        string `json:"msgID"`
	IsOnlineOnly bool   `json:"isOnlineOnly"`
落凡尘.'s avatar
落凡尘. 已提交
35 36 37 38 39 40 41 42 43 44
}
type MsgCallBackResp struct {
	ErrCode         int32  `json:"errCode"`
	ErrMsg          string `json:"errMsg"`
	ResponseErrCode int32  `json:"responseErrCode"`
	ResponseResult  struct {
		ModifiedMsg string `json:"modifiedMsg"`
		Ext         string `json:"ext"`
	}
}
H
hailong 已提交
45

programor_guo's avatar
programor_guo 已提交
46 47 48
func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
	msg.ServerMsgID = GetMsgID(msg.SendID)
	if msg.SendTime == 0 {
programor_guo's avatar
programor_guo 已提交
49
		msg.SendTime = utils.GetCurrentTimestampByMill()
programor_guo's avatar
programor_guo 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
	}
	switch msg.ContentType {
	case constant.Text:
		fallthrough
	case constant.Picture:
		fallthrough
	case constant.Voice:
		fallthrough
	case constant.Video:
		fallthrough
	case constant.File:
		fallthrough
	case constant.AtText:
		fallthrough
	case constant.Merger:
		fallthrough
	case constant.Card:
		fallthrough
	case constant.Location:
		fallthrough
	case constant.Custom:
		fallthrough
	case constant.Quote:
		utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, true)
		utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, true)
		utils.SetSwitchFromOptions(msg.Options, constant.IsSenderSync, true)
	case constant.Revoke:
		utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
	case constant.HasReadReceipt:
		utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
	case constant.Typing:
		utils.SetSwitchFromOptions(msg.Options, constant.IsHistory, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsPersistent, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsSenderSync, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false)
		utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)

	}
}
func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
	replay := pbChat.SendMsgResp{}
95 96 97
	log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String())
	//if !utils.VerifyToken(pb.Token, pb.SendID) {
	//	return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)
programor_guo's avatar
programor_guo 已提交
98 99 100 101
	rpc.encapsulateMsgData(pb.MsgData)
	msgToMQ := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID}
	//options := utils.JsonStringToMap(pbData.Options)
	isHistory := utils.GetSwitchFromOptions(pb.MsgData.Options, constant.IsHistory)
102
	mReq := MsgCallBackReq{
programor_guo's avatar
programor_guo 已提交
103 104 105 106 107 108 109 110 111
		SendID:      pb.MsgData.SendID,
		RecvID:      pb.MsgData.RecvID,
		Content:     string(pb.MsgData.Content),
		SendTime:    pb.MsgData.SendTime,
		MsgFrom:     pb.MsgData.MsgFrom,
		ContentType: pb.MsgData.ContentType,
		SessionType: pb.MsgData.SessionType,
		PlatformID:  pb.MsgData.SenderPlatformID,
		MsgID:       pb.MsgData.ClientMsgID,
112 113 114 115 116
	}
	if !isHistory {
		mReq.IsOnlineOnly = true
	}
	mResp := MsgCallBackResp{}
落凡尘.'s avatar
落凡尘. 已提交
117
	if config.Config.MessageCallBack.CallbackSwitch {
118
		bMsg, err := http2.Post(config.Config.MessageCallBack.CallbackUrl, mReq, config.Config.MessageCallBack.CallBackTimeOut)
落凡尘.'s avatar
落凡尘. 已提交
119 120 121
		if err != nil {
			log.ErrorByKv("callback to Business server err", pb.OperationID, "args", pb.String(), "err", err.Error())
			return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), "", 0)
122
		} else if err = json.Unmarshal(bMsg, &mResp); err != nil {
落凡尘.'s avatar
落凡尘. 已提交
123 124 125
			log.ErrorByKv("ws json Unmarshal err", pb.OperationID, "args", pb.String(), "err", err.Error())
			return returnMsg(&replay, pb, 200, err.Error(), "", 0)
		} else {
126 127
			if mResp.ErrCode != 0 {
				return returnMsg(&replay, pb, mResp.ResponseErrCode, mResp.ErrMsg, "", 0)
落凡尘.'s avatar
落凡尘. 已提交
128
			} else {
programor_guo's avatar
programor_guo 已提交
129
				pb.MsgData.Content = []byte(mResp.ResponseResult.ModifiedMsg)
落凡尘.'s avatar
落凡尘. 已提交
130 131
			}
		}
Z
Zzr 已提交
132
	}
programor_guo's avatar
programor_guo 已提交
133
	switch pb.MsgData.SessionType {
Z
Zzr 已提交
134
	case constant.SingleChatType:
programor_guo's avatar
programor_guo 已提交
135
		isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb)
136
		if isSend {
programor_guo's avatar
programor_guo 已提交
137 138
			msgToMQ.MsgData = pb.MsgData
			err1 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.RecvID)
139
			if err1 != nil {
programor_guo's avatar
programor_guo 已提交
140
				log.NewError(msgToMQ.OperationID, "kafka send msg err:RecvID", msgToMQ.MsgData.RecvID, msgToMQ.String())
141 142 143
				return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
			}
		}
programor_guo's avatar
programor_guo 已提交
144 145 146 147 148 149
		if msgToMQ.MsgData.SendID != msgToMQ.MsgData.RecvID { //Filter messages sent to yourself
			err2 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.SendID)
			if err2 != nil {
				log.NewError(msgToMQ.OperationID, "kafka send msg err:SendID", msgToMQ.MsgData.SendID, msgToMQ.String())
				return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
			}
Z
Zzr 已提交
150
		}
programor_guo's avatar
programor_guo 已提交
151
		return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
Z
Zzr 已提交
152 153 154 155
	case constant.GroupChatType:
		etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
		client := pbGroup.NewGroupClient(etcdConn)
		req := &pbGroup.GetGroupAllMemberReq{
programor_guo's avatar
programor_guo 已提交
156 157
			GroupID:     pb.MsgData.GroupID,
			OperationID: pb.OperationID,
Z
Zzr 已提交
158 159 160
		}
		reply, err := client.GetGroupAllMember(context.Background(), req)
		if err != nil {
programor_guo's avatar
programor_guo 已提交
161
			log.Error(pb.Token, pb.OperationID, "rpc send_msg getGroupInfo failed, err = %s", err.Error())
Z
Zzr 已提交
162 163
			return returnMsg(&replay, pb, 201, err.Error(), "", 0)
		}
programor_guo's avatar
programor_guo 已提交
164 165 166
		if reply.ErrCode != 0 {
			log.Error(pb.Token, pb.OperationID, "rpc send_msg getGroupInfo failed, err = %s", reply.ErrMsg)
			return returnMsg(&replay, pb, reply.ErrCode, reply.ErrMsg, "", 0)
Z
Zzr 已提交
167
		}
programor_guo's avatar
programor_guo 已提交
168 169
		groupID := pb.MsgData.GroupID
		for _, v := range reply.MemberList {
programor_guo's avatar
programor_guo 已提交
170 171
			pb.MsgData.RecvID = v.UserID
			isSend := modifyMessageByUserMessageReceiveOpt(v.UserID, groupID, constant.GroupChatType, pb)
172
			if isSend {
programor_guo's avatar
programor_guo 已提交
173
				msgToMQ.MsgData = pb.MsgData
programor_guo's avatar
programor_guo 已提交
174
				err := rpc.sendMsgToKafka(&msgToMQ, v.UserID)
175
				if err != nil {
programor_guo's avatar
programor_guo 已提交
176
					log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v.UserID, msgToMQ.String())
177 178
					return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
				}
Z
Zzr 已提交
179
			}
180

Z
Zzr 已提交
181
		}
programor_guo's avatar
programor_guo 已提交
182
		return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
Z
Zzr 已提交
183
	default:
184
		return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)
W
wenxu12345 已提交
185
	}
H
hailong 已提交
186
}
W
pb  
wenxu12345 已提交
187

programor_guo's avatar
programor_guo 已提交
188
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string) error {
H
hailong 已提交
189 190
	pid, offset, err := rpc.producer.SendMessage(m, key)
	if err != nil {
落凡尘.'s avatar
落凡尘. 已提交
191
		log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key)
H
hailong 已提交
192
	}
programor_guo's avatar
programor_guo 已提交
193
	return err
H
hailong 已提交
194 195 196 197 198
}
func GetMsgID(sendID string) string {
	t := time.Now().Format("2006-01-02 15:04:05")
	return t + "-" + sendID + "-" + strconv.Itoa(rand.Int())
}
W
wenxu12345 已提交
199

programor_guo's avatar
programor_guo 已提交
200
func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64) (*pbChat.SendMsgResp, error) {
落凡尘.'s avatar
落凡尘. 已提交
201 202 203
	replay.ErrCode = errCode
	replay.ErrMsg = errMsg
	replay.ServerMsgID = serverMsgID
programor_guo's avatar
programor_guo 已提交
204
	replay.ClientMsgID = pb.MsgData.ClientMsgID
落凡尘.'s avatar
落凡尘. 已提交
205 206 207
	replay.SendTime = sendTime
	return replay, nil
}
W
wenxu12345 已提交
208

programor_guo's avatar
programor_guo 已提交
209
func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) bool {
210
	conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType)
211
	opt, err := db.DB.GetSingleConversationMsgOpt(userID, conversationID)
programor_guo's avatar
programor_guo 已提交
212
	if err != nil || err != redis.ErrNil {
programor_guo's avatar
programor_guo 已提交
213
		log.NewError(pb.OperationID, "GetSingleConversationMsgOpt from redis err", conversationID, pb.String(), err.Error())
214 215 216 217 218 219 220 221
		return true
	}
	switch opt {
	case constant.ReceiveMessage:
		return true
	case constant.NotReceiveMessage:
		return false
	case constant.ReceiveNotNotifyMessage:
programor_guo's avatar
programor_guo 已提交
222 223
		if pb.MsgData.Options == nil {
			pb.MsgData.Options = make(map[string]bool, 10)
programor_guo's avatar
programor_guo 已提交
224
		}
programor_guo's avatar
programor_guo 已提交
225
		utils.SetSwitchFromOptions(pb.MsgData.Options, constant.IsOfflinePush, false)
226 227 228 229 230
		return true
	}

	return true
}
W
wenxu12345 已提交
231 232 233 234

type NotificationMsg struct {
	SendID      string
	RecvID      string
W
wenxu12345 已提交
235
	Content     []byte //  open_im_sdk.TipsComm
W
wenxu12345 已提交
236 237 238 239 240 241
	MsgFrom     int32
	ContentType int32
	SessionType int32
	OperationID string
}

W
wenxu12345 已提交
242
func Notification(n *NotificationMsg) {
programor_guo's avatar
programor_guo 已提交
243 244 245
	var req pbChat.SendMsgReq
	var msg sdk_ws.MsgData
	var offlineInfo sdk_ws.OfflinePushInfo
programor_guo's avatar
programor_guo 已提交
246
	var title, desc, ex string
programor_guo's avatar
programor_guo 已提交
247 248 249 250 251 252 253 254
	var pushSwitch bool
	req.OperationID = n.OperationID
	msg.SendID = n.SendID
	msg.RecvID = n.RecvID
	msg.Content = n.Content
	msg.MsgFrom = n.MsgFrom
	msg.ContentType = n.ContentType
	msg.SessionType = n.SessionType
programor_guo's avatar
programor_guo 已提交
255
	msg.CreateTime = utils.GetCurrentTimestampByMill()
programor_guo's avatar
programor_guo 已提交
256 257 258 259 260 261
	msg.ClientMsgID = utils.GetMsgID(n.SendID)
	switch n.SessionType {
	case constant.GroupChatType:
		msg.RecvID = ""
		msg.GroupID = n.RecvID
	}
W
wenxu12345 已提交
262
	if true {
programor_guo's avatar
programor_guo 已提交
263
		msg.Options = make(map[string]bool, 10)
programor_guo's avatar
programor_guo 已提交
264
		//utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false)
programor_guo's avatar
test  
programor_guo 已提交
265 266
		//utils.SetSwitchFromOptions(msg.Options, constant.IsHistory, false)
		//utils.SetSwitchFromOptions(msg.Options, constant.IsPersistent, false)
programor_guo's avatar
programor_guo 已提交
267 268 269
	}
	offlineInfo.IOSBadgeCount = config.Config.IOSPush.BadgeCount
	offlineInfo.IOSPushSound = config.Config.IOSPush.PushSound
W
wenxu12345 已提交
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
	//switch msg.ContentType {
	//case constant.GroupCreatedNotification:
	//	pushSwitch = config.Config.Notification.GroupCreated.OfflinePush.PushSwitch
	//	title = config.Config.Notification.GroupCreated.OfflinePush.Title
	//	desc = config.Config.Notification.GroupCreated.OfflinePush.Desc
	//	ex = config.Config.Notification.GroupCreated.OfflinePush.Ext
	//case constant.GroupInfoChangedNotification:
	//	pushSwitch = config.Config.Notification.GroupInfoChanged.OfflinePush.PushSwitch
	//	title = config.Config.Notification.GroupInfoChanged.OfflinePush.Title
	//	desc = config.Config.Notification.GroupInfoChanged.OfflinePush.Desc
	//	ex = config.Config.Notification.GroupInfoChanged.OfflinePush.Ext
	//case constant.JoinApplicationNotification:
	//	pushSwitch = config.Config.Notification.ApplyJoinGroup.OfflinePush.PushSwitch
	//	title = config.Config.Notification.ApplyJoinGroup.OfflinePush.Title
	//	desc = config.Config.Notification.ApplyJoinGroup.OfflinePush.Desc
	//	ex = config.Config.Notification.ApplyJoinGroup.OfflinePush.Ext
	//}
programor_guo's avatar
programor_guo 已提交
287 288 289
	utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, pushSwitch)
	offlineInfo.Title = title
	offlineInfo.Desc = desc
programor_guo's avatar
programor_guo 已提交
290
	offlineInfo.Ex = ex
programor_guo's avatar
programor_guo 已提交
291 292 293 294 295 296 297 298 299 300
	msg.OfflinePushInfo = &offlineInfo
	req.MsgData = &msg
	etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
	client := pbChat.NewChatClient(etcdConn)
	reply, err := client.SendMsg(context.Background(), &req)
	if err != nil {
		log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String(), err.Error())
	} else if reply.ErrCode != 0 {
		log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String())
	}
W
wenxu12345 已提交
301
}