Open-IM-Server is open source instant messaging Server.Backend in Go.
## Open-IM-Server: Open source Instant Messaging Server
package constant
const (
//group admin
GroupAdmin = 1
//feiend related
BlackListFlag = 1
NotFriendFlag = 0
FriendFlag = 1
//Websocket Protocol
WSGetNewestSeq = 1001
WSPullMsg = 1002
WSSendMsg = 1003
WSPushMsg = 2001
Text = 101
Picture = 102
Voice = 103
Video = 104
File = 105
SyncSenderMsg = 108
AddFriendTip = 201
AgreeAddFriendTip = 202
KickOnlineTip = 203
UserMsgType = 100
SysMsgType = 200
SingleChatType = 1
GroupChatType = 2
var ContentType2PushContent = map[int64]string{
Picture: "[picture]",
Voice: "[voice]",
Video: "[video]",
File: "[file]",
package im_mysql_msg_model
import (
pbMsg "Open_IM/src/proto/chat"
// ChatLog Chat information table structure
type ChatLog struct {
MsgId string `gorm:"primary_key"` // Chat history primary key ID
SendID string `gorm:"column:send_id"` // Send ID
RecvID string `gorm:"column:recv_id"` //Receive ID
SendTime time.Time `gorm:"column:send_time"` // Send time
SessionType int32 `gorm:"column:session_type"` // Session type
ContentType int32 `gorm:"column:content_type"` // Message content type
MsgFrom int32 `gorm:"column:msg_from"` // Source, user, system
Content string `gorm:"column:content"` // Chat content
SenderPlatformID int32 `gorm:"column:sender_platform_id"` //The sender's platform ID
Remark sql.NullString `gorm:"column:remark"` // remark
func InsertMessageToChatLog(msgData pbMsg.WSToMsgSvrChatMsg) error {
dbConn, err := db.DB.MysqlDB.DefaultGormDB()
if err != nil {
return err
chatLog := ChatLog{
MsgId: msgData.MsgID,
SendID: msgData.SendID,
RecvID: msgData.RecvID,
SendTime: utils.UnixSecondToTime(msgData.SendTime),
SessionType: msgData.SessionType,
ContentType: msgData.ContentType,
MsgFrom: msgData.MsgFrom,
Content: msgData.Content,
SenderPlatformID: msgData.PlatformID,
return dbConn.Table("chat_log").Create(chatLog).Error
package im_mysql_msg_model
import (
func getHashMsgDBAddr(userID string) string {
hCode := crc32.ChecksumIEEE([]byte(userID))
return config.Config.Mysql.DBAddress[hCode%uint32(len(config.Config.Mysql.DBAddress))]
func getHashMsgTableIndex(userID string) int {
hCode := crc32.ChecksumIEEE([]byte(userID))
return int(hCode % uint32(config.Config.Mysql.DBMsgTableNum))
func QueryUserMsgID(userID string) ([]string, error) {
dbAddress, dbTableIndex := getHashMsgDBAddr(userID), getHashMsgTableIndex(userID)
dbTableName := "receive" + strconv.Itoa(dbTableIndex)
dbConn, _ := db.DB.MysqlDB.GormDB(dbAddress, config.Config.Mysql.DBTableName)
var msgID string
var msgIDList []string
rows, _ := dbConn.Raw("select msg_id from ? where user_id = ?", dbTableName, userID).Rows()
defer rows.Close()
for rows.Next() {
msgIDList = append(msgIDList, msgID)
return msgIDList, nil
package im_mysql_msg_model
import (
// Receive Inbox table structure
type Receive struct {
UserId string `gorm:"primary_key"` // 收件箱主键ID
Seq int64 `gorm:"primary_key"` // 收件箱主键ID
MsgId string
CreateTime *time.Time
//func InsertMessageToReceive(seq int64, userid, msgid string) error {
// conn := db.NewDbConnection()
// receive := Receive{
// UID: userid,
// Seq: seq,
// MsgId: msgid,
// }
// err := conn.Table("receive").Create(&receive).Error
// return err
//func GetBiggestSeqFromReceive(userid string) (seq int64, err error) {
// //得到数据库的连接(并非真连接,调用时才连接,由gorm自动维护数据库连接池)
// conn := db.NewDbConnection()
// err = conn.Raw("select max(seq) from receive where user_id = ?", userid).Row().Scan(&seq)
// return seq, err
package kafka
import (
type Consumer struct {
addr []string
WG sync.WaitGroup
Topic string
PartitionList []int32
Consumer sarama.Consumer
func NewKafkaConsumer(addr []string, topic string) *Consumer {
p := Consumer{}
p.Topic = topic
p.addr = addr
consumer, err := sarama.NewConsumer(p.addr, nil)
if err != nil {
return nil
p.Consumer = consumer
partitionList, err := consumer.Partitions(p.Topic)
if err != nil {
return nil
p.PartitionList = partitionList
return &p
package kafka
import (
type MConsumerGroup struct {
groupID string
topics []string
type MConsumerGroupConfig struct {
KafkaVersion sarama.KafkaVersion
OffsetsInitial int64
IsReturnErr bool
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addr []string, groupID string) *MConsumerGroup {
config := sarama.NewConfig()
config.Version = consumerConfig.KafkaVersion
config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
config.Consumer.Return.Errors = consumerConfig.IsReturnErr
client, err := sarama.NewClient(addr, config)
if err != nil {
consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
if err != nil {
return &MConsumerGroup{
func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) {
ctx := context.Background()
for {
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
if err != nil {
package kafka
import (
log2 "Open_IM/src/common/log"
type Producer struct {
topic string
addr []string
config *sarama.Config
producer sarama.SyncProducer
func NewKafkaProducer(addr []string, topic string) *Producer {
p := Producer{}
p.config = sarama.NewConfig() //Instantiate a sarama Config
p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
p.addr = addr
p.topic = topic
producer, err := sarama.NewSyncProducer(p.addr, p.config) //初始化客户端
if err != nil {
return nil
p.producer = producer
return &p
func (p *Producer) SendMessage(m proto.Message, key ...string) (int32, int64, error) {
kMsg := &sarama.ProducerMessage{}
kMsg.Topic = p.topic
if len(key) == 1 {
kMsg.Key = sarama.StringEncoder(key[0])
bMsg, err := proto.Marshal(m)
if err != nil {
log2.Error("", "", "proto marshal err = %s", err.Error())
return -1, -1, err
kMsg.Value = sarama.ByteEncoder(bMsg)
return p.producer.SendMessage(kMsg)
package log
import (
elasticV7 "github.com/olivere/elastic/v7"
//esHook custom es hook
type esHook struct {
moduleName string
client *elasticV7.Client
//newEsHook initialization
func newEsHook(moduleName string) *esHook {
//client, err := elastic.NewClient(elastic.SetURL("http://localhost:9200"))
//if err != nil {
// log.Panic(err)
//hook, err := elogrus.NewAsyncElasticHook(client, "localhost", logrus.DebugLevel, "mylog")
//if err != nil {
// log.Panic(err)
es, err := elasticV7.NewClient(
elasticV7.SetBasicAuth(config.Config.Log.ElasticSearchUser, config.Config.Log.ElasticSearchPassword),
elasticV7.SetErrorLog(log.New(os.Stderr, "ES:", log.LstdFlags)),
if err != nil {
log.Fatal("failed to create Elastic V7 Client: ", err)
//info, code, err := es.Ping(logConfig.ElasticSearch.EsAddr[0]).Do(context.Background())
//if err != nil {
// panic(err)
//fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
//esversion, err := es.ElasticsearchVersion(logConfig.ElasticSearch.EsAddr[0])
//if err != nil {
// panic(err)
//fmt.Printf("Elasticsearch version %s\n", esversion)
return &esHook{client: es, moduleName: moduleName}
//Fire log hook interface method
func (hook *esHook) Fire(entry *logrus.Entry) error {
doc := newEsLog(entry)
go hook.sendEs(doc)
return nil
//Levels log hook interface method, the log affected by this hook
func (hook *esHook) Levels() []logrus.Level {
return logrus.AllLevels
//sendEs Asynchronously send logs to es
func (hook *esHook) sendEs(doc appLogDocModel) {
defer func() {
if r := recover(); r != nil {
fmt.Println("send entry to es failed: ", r)
_, err := hook.client.Index().Index(hook.moduleName).Type(doc.indexName()).BodyJson(doc).Do(context.Background())
if err != nil {
//appLogDocModel es model
type appLogDocModel map[string]interface{}
func newEsLog(e *logrus.Entry) appLogDocModel {
ins := make(map[string]interface{})
ins["level"] = strings.ToUpper(e.Level.String())
ins["time"] = e.Time.Format("2006-01-02 15:04:05")
for kk, vv := range e.Data {
ins[kk] = vv
ins["tipInfo"] = e.Message
return ins
// indexName es index name time division
func (m *appLogDocModel) indexName() string {
return time.Now().Format("2006-01-02")
package log
import (
type fileHook struct{}
func newFileHook() *fileHook {
return &fileHook{}
func (f *fileHook) Levels() []logrus.Level {
return logrus.AllLevels
func (f *fileHook) Fire(entry *logrus.Entry) error {
entry.Data["FilePath"] = findCaller(5)
return nil
func findCaller(skip int) string {
file := ""
line := 0
for i := 0; i < 10; i++ {
file, line = getCaller(skip + i)
if !strings.HasPrefix(file, "log") {
return fmt.Sprintf("%s:%d", file, line)
func getCaller(skip int) (string, int) {
_, file, line, ok := runtime.Caller(skip)
if !ok {
return "", 0
n := 0
for i := len(file) - 1; i > 0; i-- {
if file[i] == '/' {
if n >= 2 {
file = file[i+1:]
return file, line
package log
import (
nested "github.com/antonfisher/nested-logrus-formatter"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
var logger *Logger
type Logger struct {
Pid int
func init() {
logger = loggerInit("")
func NewPrivateLog(moduleName string) {
logger = loggerInit(moduleName)
func loggerInit(moduleName string) *Logger {
var logger = logrus.New()
//All logs will be printed
//Log Style Setting
TimestampFormat: "2006-01-02 15:04:05",
HideKeys: false,
FieldsOrder: []string{"PID"},
//File name and line number display hook
//Send logs to elasticsearch hook
if config.Config.Log.ElasticSearchSwitch == true {
//Log file segmentation hook
hook := NewLfsHook(config.Config.Log.StorageLocation+time.Now().Format("2006-01-02")+".log", 0, 5, moduleName)
return &Logger{
func NewLfsHook(logName string, rotationTime time.Duration, maxRemainNum uint, moduleName string) logrus.Hook {
var fileNameSuffix string
if GetCurrentTimestamp() >= GetCurDayZeroTimestamp() && GetCurrentTimestamp() <= GetCurDayHalfTimestamp() {
fileNameSuffix = time.Now().Format("2006-01-02") + ".log"
} else {
fileNameSuffix = time.Now().Format("2006-01-02") + ".log"
writer, err := rotatelogs.New(
if err != nil {
writeInfo, err := rotatelogs.New(
writeError, err := rotatelogs.New(
writeDebug, err := rotatelogs.New(
writeWarn, err := rotatelogs.New(
if err != nil {
lfsHook := lfshook.NewHook(lfshook.WriterMap{
logrus.DebugLevel: writeDebug,
logrus.InfoLevel: writeInfo,
logrus.WarnLevel: writeWarn,
logrus.ErrorLevel: writeError,
logrus.FatalLevel: writer,
logrus.PanicLevel: writer,
}, &nested.Formatter{
TimestampFormat: "2006-01-02 15:04:05",
HideKeys: false,
FieldsOrder: []string{"PID"},
return lfsHook
func Info(token, OperationID, format string, args ...interface{}) {
if token == "" && OperationID == "" {
logger.WithFields(logrus.Fields{}).Infof(format, args...)
} else {
"token": token,
"OperationID": OperationID,
}).Infof(format, args...)
func Error(token, OperationID, format string, args ...interface{}) {
if token == "" && OperationID == "" {
logger.WithFields(logrus.Fields{}).Errorf(format, args...)
} else {
"token": token,
"OperationID": OperationID,
}).Errorf(format, args...)
func Debug(token, OperationID, format string, args ...interface{}) {
if token == "" && OperationID == "" {
logger.WithFields(logrus.Fields{}).Debugf(format, args...)
} else {
"token": token,
"OperationID": OperationID,
}).Debugf(format, args...)
func Warning(token, OperationID, format string, args ...interface{}) {
if token == "" && OperationID == "" {
logger.WithFields(logrus.Fields{}).Warningf(format, args...)
} else {
"token": token,
"OperationID": OperationID,
}).Warningf(format, args...)
func InfoByArgs(format string, args ...interface{}) {
logger.WithFields(logrus.Fields{}).Infof(format, args)
func ErrorByArgs(format string, args ...interface{}) {
logger.WithFields(logrus.Fields{}).Errorf(format, args...)
//Print log information in k, v format,
//kv is best to appear in pairs. tipInfo is the log prompt information for printing,
//and kv is the key and value for printing.
func InfoByKv(tipInfo, OperationID string, args ...interface{}) {
fields := make(logrus.Fields)
argsHandle(OperationID, fields, args)
func ErrorByKv(tipInfo, OperationID string, args ...interface{}) {
fields := make(logrus.Fields)
argsHandle(OperationID, fields, args)
func DebugByKv(tipInfo, OperationID string, args ...interface{}) {
fields := make(logrus.Fields)
argsHandle(OperationID, fields, args)
func WarnByKv(tipInfo, OperationID string, args ...interface{}) {
fields := make(logrus.Fields)
argsHandle(OperationID, fields, args)
//internal method
func argsHandle(OperationID string, fields logrus.Fields, args []interface{}) {
for i := 0; i < len(args); i += 2 {
if i+1 < len(args) {
fields[fmt.Sprintf("%v", args[i])] = args[i+1]
} else {
fields[fmt.Sprintf("%v", args[i])] = ""
fields["operationID"] = OperationID
fields["PID"] = logger.Pid
package log
import (
const (
TimeOffset = 8 * 3600 //8 hour offset
HalfOffset = 12 * 3600 //Half-day hourly offset
//Get the current timestamp
func GetCurrentTimestamp() int64 {
return time.Now().Unix()
//Get the timestamp at 0 o'clock of the day
func GetCurDayZeroTimestamp() int64 {
timeStr := time.Now().Format("2006-01-02")
t, _ := time.Parse("2006-01-02", timeStr)
return t.Unix() - TimeOffset
//Get the timestamp at 12 o'clock on the day
func GetCurDayHalfTimestamp() int64 {
return GetCurDayZeroTimestamp() + HalfOffset
//Get the formatted time at 0 o'clock of the day, the format is "2006-01-02_00-00-00"
func GetCurDayZeroTimeFormat() string {
return time.Unix(GetCurDayZeroTimestamp(), 0).Format("2006-01-02_15-04-05")
//Get the formatted time at 12 o'clock of the day, the format is "2006-01-02_12-00-00"
func GetCurDayHalfTimeFormat() string {
return time.Unix(GetCurDayZeroTimestamp()+HalfOffset, 0).Format("2006-01-02_15-04-05")
func GetTimeStampByFormat(datetime string) string {
timeLayout := "2006-01-02 15:04:05" //Template required for transformation
loc, _ := time.LoadLocation("Local") //Get time zone
tmp, _ := time.ParseInLocation(timeLayout, datetime, loc)
timestamp := tmp.Unix() //Converted to timestamp type is int64
return strconv.FormatInt(timestamp, 10)
func TimeStringFormatTimeUnix(timeFormat string, timeSrc string) int64 {
tm, _ := time.Parse(timeFormat, timeSrc)
return tm.Unix()
package main
import (
rpcAuth "Open_IM/src/rpc/auth/auth"
func main() {
rpcPort := flag.Int("port", 10600, "RpcToken default listen port 10800")
rpcServer := rpcAuth.NewRpcAuthServer(*rpcPort)
.PHONY: all build run gotool install clean help
all: gotool build
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o ${BINARY_NAME} ${GO_FILE}
@go run ./
go fmt ./
go vet ./
make build
@if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi
package rpcChat
import (
commonDB "Open_IM/src/common/db"
pbMsg "Open_IM/src/proto/chat"
func (rpc *rpcChat) GetNewSeq(_ context.Context, in *pbMsg.GetNewSeqReq) (*pbMsg.GetNewSeqResp, error) {
log.InfoByKv("rpc getNewSeq is arriving", in.OperationID, in.String())
//seq, err := model.GetBiggestSeqFromReceive(in.UserID)
seq, err := commonDB.DB.GetUserSeq(in.UserID)
resp := new(pbMsg.GetNewSeqResp)
if err == nil {
resp.Seq = seq
resp.ErrCode = 0
resp.ErrMsg = ""
return resp, err
} else {
log.ErrorByKv("getSeq from redis error", in.OperationID, "args", in.String(), "err", err.Error())
resp.Seq = 0
resp.ErrCode = 0
resp.ErrMsg = ""
return resp, nil
//func (s *MsgServer) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*pbMsg.PullMessageResp, error) {
// log.InfoByArgs(fmt.Sprintf("rpc pullMessage is arriving,args=%s", in.String()))
// resp := new(pbMsg.PullMessageResp)
// var respMsgFormat []*pbMsg.MsgFormat
// var respUserMsgFormat []*pbMsg.UserMsgFormat
// conn := db.NewDbConnection()
// rows, err := conn.Table("receive r").Select("c.sender_id,c.receiver_id,"+
// "c.msg_type,c.push_msg_type,c.chat_type,c.msg_id,c.send_content,r.seq,c.send_time,c.sender_nickname,c.receiver_nickname,c.sender_head_url,c.receiver_head_url").
// Joins("INNER JOIN chat_log c ON r.msg_id = c.msg_id AND r.user_id = ? AND seq BETWEEN ? AND ?",
// in.UserID, in.SeqBegin, in.SeqEnd).Rows()
// if err != nil {
// fmt.Printf("pullMsg data error: %v\n", err)
// resp.ErrCode = 1
// resp.ErrMsg = err.Error()
// return resp, nil
// }
// defer rows.Close()
// for rows.Next() {
// tempResp := new(pbMsg.MsgFormat)
// rows.Scan(&tempResp.SendID, &tempResp.RecvID, &tempResp.MsgType, &tempResp.PushMsgType, &tempResp.ChatType,
// &tempResp.MsgID, &tempResp.Msg, &tempResp.Seq, &tempResp.Time, &tempResp.SendNickName, &tempResp.RecvNickName,
// &tempResp.SendHeadUrl, &tempResp.RecvHeadUrl)
// respMsgFormat = append(respMsgFormat, tempResp)
// }
// respUserMsgFormat = msgHandleByUser(respMsgFormat, in.UserID)
// return &pbMsg.PullMessageResp{
// ErrCode: 0,
// ErrMsg: "",
// UserMsg: respUserMsgFormat,
// }, nil
func (rpc *rpcChat) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*pbMsg.PullMessageResp, error) {
log.InfoByKv("rpc pullMessage is arriving", in.OperationID, "args", in.String())
resp := new(pbMsg.PullMessageResp)
var respSingleMsgFormat []*pbMsg.GatherFormat
var respGroupMsgFormat []*pbMsg.GatherFormat
SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetUserChat(in.UserID, in.SeqBegin, in.SeqEnd)
if err != nil {
log.ErrorByKv("pullMsg data error", in.OperationID, in.String())
resp.ErrCode = 1
resp.ErrMsg = err.Error()
return resp, nil
respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID)
respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat)
return &pbMsg.PullMessageResp{
ErrCode: 0,
ErrMsg: "",
MaxSeq: MaxSeq,
MinSeq: MinSeq,
SingleUserMsg: respSingleMsgFormat,
GroupUserMsg: respGroupMsgFormat,
}, nil
func singleMsgHandleByUser(allMsg []*pbMsg.MsgFormat, ownerId string) []*pbMsg.GatherFormat {
var userid string
var respMsgFormat []*pbMsg.GatherFormat
m := make(map[string]MsgFormats)
for _, v := range allMsg {
if v.RecvID != ownerId {
userid = v.RecvID
} else {
userid = v.SendID
if value, ok := m[userid]; !ok {
var t MsgFormats
m[userid] = append(t, v)
} else {
m[userid] = append(value, v)
for user, msg := range m {
tempUserMsg := new(pbMsg.GatherFormat)
tempUserMsg.ID = user
tempUserMsg.List = msg
respMsgFormat = append(respMsgFormat, tempUserMsg)
return respMsgFormat
func groupMsgHandleByUser(allMsg []*pbMsg.MsgFormat) []*pbMsg.GatherFormat {
var respMsgFormat []*pbMsg.GatherFormat
m := make(map[string]MsgFormats)
for _, v := range allMsg {
groupID := strings.Split(v.RecvID, " ")[1]
if value, ok := m[groupID]; !ok {
var t MsgFormats
m[groupID] = append(t, v)
} else {
m[groupID] = append(value, v)
for groupID, msg := range m {
tempUserMsg := new(pbMsg.GatherFormat)
tempUserMsg.ID = groupID
tempUserMsg.List = msg
respMsgFormat = append(respMsgFormat, tempUserMsg)
return respMsgFormat
type MsgFormats []*pbMsg.MsgFormat
// 实现sort.Interface接口取元素数量方法
func (s MsgFormats) Len() int {
return len(s)
// 实现sort.Interface接口比较元素方法
func (s MsgFormats) Less(i, j int) bool {
return s[i].SendTime < s[j].SendTime
// 实现sort.Interface接口交换元素方法
func (s MsgFormats) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
package rpcChat
import (
log2 "Open_IM/src/common/log"
pbChat "Open_IM/src/proto/chat"
type rpcChat struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
producer *kafka.Producer
func NewRpcChatServer(port int) *rpcChat {
rc := rpcChat{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImOfflineMessageName,
etcdSchema: config.Config.Etcd.EtcdSchema,
etcdAddr: config.Config.Etcd.EtcdAddr,
rc.producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
return &rc
func (rpc *rpcChat) Run() {
log2.Info("", "", "rpc get_token init...")
address := utils.ServerIP + ":" + strconv.Itoa(rpc.rpcPort)
listener, err := net.Listen("tcp", address)
if err != nil {
log2.Error("", "", "listen network failed, err = %s, address = %s", err.Error(), address)
log2.Info("", "", "listen network success, address = %s", address)
//grpc server
srv := grpc.NewServer()
defer srv.GracefulStop()
//service registers with etcd
pbChat.RegisterChatServer(srv, rpc)
err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), utils.ServerIP, rpc.rpcPort, rpc.rpcRegisterName, 10)
if err != nil {
log2.Error("", "", "register rpc get_token to etcd failed, err = %s", err.Error())
err = srv.Serve(listener)
if err != nil {
log2.Info("", "", "rpc get_token fail, err = %s", err.Error())
log2.Info("", "", "rpc get_token init success")
package rpcChat
import (
pbChat "Open_IM/src/proto/chat"
func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) {
serverMsgID := GetMsgID(pb.SendID)
pbData := pbChat.WSToMsgSvrChatMsg{}
pbData.MsgFrom = pb.MsgFrom
pbData.SessionType = pb.SessionType
pbData.ContentType = pb.ContentType
pbData.Content = pb.Content
pbData.RecvID = pb.RecvID
pbData.ForceList = pb.ForceList
pbData.OfflineInfo = pb.OffLineInfo
pbData.Options = pb.Options
pbData.PlatformID = pb.PlatformID
pbData.SendID = pb.SendID
pbData.MsgID = serverMsgID
pbData.OperationID = pb.OperationID
pbData.Token = pb.Token
pbData.SendTime = utils.GetCurrentTimestampBySecond()
rpc.sendMsgToKafka(&pbData, pbData.RecvID)
rpc.sendMsgToKafka(&pbData, pbData.SendID)
replay := pbChat.UserSendMsgResp{}
replay.ReqIdentifier = pb.ReqIdentifier
replay.MsgIncr = pb.MsgIncr
replay.ClientMsgID = pb.ClientMsgID
replay.ServerMsgID = serverMsgID
return &replay, nil
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) {
pid, offset, err := rpc.producer.SendMessage(m, key)
if err != nil {
log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error())
func GetMsgID(sendID string) string {
t := time.Now().Format("2006-01-02 15:04:05")
return t + "-" + sendID + "-" + strconv.Itoa(rand.Int())
package main
import (
rpcChat "Open_IM/src/rpc/chat/chat"
func main() {
rpcPort := flag.String("port", "", "rpc listening port")
rpcServer := rpcChat.NewRpcChatServer(utils.StringToInt(*rpcPort))
