event.go 3.5 KB
Newer Older
T
tanggen 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or  modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package rpc

import (
	"context"
	"encoding/json"

	"github.com/finogeeks/ligase/common"
	"github.com/finogeeks/ligase/common/config"
	"github.com/finogeeks/ligase/model/types"
	"github.com/finogeeks/ligase/rcsserver/processors"
	"github.com/finogeeks/ligase/skunkworks/gomatrixserverlib"
	"github.com/finogeeks/ligase/skunkworks/log"
	"github.com/nats-io/go-nats"
)

type EventConsumer struct {
	rpcClient *common.RpcClient
	proc      *processors.EventProcessor
	//db        model.RCSServerDatabase
	//repo      *repos.RCSServerRepo
	cfg      *config.Dendrite
	slot     uint32
	chanSize uint32
	//msgChan  []chan *types.RCSInputEventContent
	msgChan []chan common.ContextMsg
}

func NewEventConsumer(
	cfg *config.Dendrite,
	rpcClient *common.RpcClient,
	proc *processors.EventProcessor,
	//db model.RCSServerDatabase,
	//repo *repos.RCSServerRepo,
) *EventConsumer {
	return &EventConsumer{
		rpcClient: rpcClient,
		proc:      proc,
		//db:        db,
		//repo:      repo,
		cfg:      cfg,
		slot:     64,   // Low frequency.
		chanSize: 1024, // Todo: use linked list.
	}
}

func (s *EventConsumer) GetTopic() string {
	return types.RCSEventTopicDef
}

func (s *EventConsumer) cb(ctx context.Context, msg *nats.Msg) {
	var cont types.RCSInputEventContent
	if err := json.Unmarshal(msg.Data, &cont); err != nil {
		log.Errorf("Failed to unmarshal nats.Msg to gomatrixserverlib.Event: %v\n", err)
		return
	}
	cont.Reply = msg.Reply
	// TODO: glare situation.
	idx := common.CalcStringHashCode(cont.Event.RoomID()) % s.slot
	s.msgChan[idx] <- common.ContextMsg{Ctx: ctx, Msg: &cont}
}

func (s *EventConsumer) Start() error {
	s.msgChan = make([]chan common.ContextMsg, s.slot)
	for i := uint32(0); i < s.slot; i++ {
		s.msgChan[i] = make(chan common.ContextMsg, s.chanSize)
		go s.startWorker(s.msgChan[i])
	}

	s.rpcClient.ReplyGrpWithContext(s.GetTopic(), types.RCSSERVER_RPC_GROUP, s.cb)
	return nil
}

func (s *EventConsumer) startWorker(msgChan chan common.ContextMsg) {
	for cont := range msgChan {
		s.handleEvent(cont.Ctx, cont.Msg.(*types.RCSInputEventContent))
	}
}

func (s *EventConsumer) handleEvent(ctx context.Context, cont *types.RCSInputEventContent) {
	ev, _ := json.Marshal(cont.Event)
	log.Infof("rcsserver=====================EventConsumer.handleEvent, RCS Server receive event: %s\n", string(ev))
	var evs []gomatrixserverlib.Event
	var err error
	if cont.Event.Type() == gomatrixserverlib.MRoomCreate {
		evs, err = s.proc.HandleCreate(ctx, &cont.Event)
	} else if cont.Event.Type() == gomatrixserverlib.MRoomMember {
		evs, err = s.proc.HandleMembership(ctx, &cont.Event)
	} else {
		evs = append(evs, cont.Event)
		err = nil
	}

	resp := types.RCSOutputEventContent{
		Events:  evs,
		Succeed: true,
	}

	if err != nil {
		log.Errorf("Failed to handle event, event=%s, error: %v\n", string(ev), err)
		resp.Succeed = false
	}

	s.rpcClient.PubObj(cont.Reply, resp)
	return
}