提交 cea9e317 编写于 作者: M Mark Haines 提交者: GitHub

Make the roomserver output format more flexible (#155)

* Make the roomserver output format more flexible

* Fix the sync server integration testdata

* Fix roomserver testdata

* Actually fix the sync server tests

* Fix typo
上级 7d36ca03
......@@ -310,8 +310,8 @@ func main() {
}
want := []string{
`{
"Event":{
`{"type":"new_room_event","new_room_event":{
"event":{
"auth_events":[[
"$1463671337126266wrSBX:matrix.org",{"sha256":"h/VS07u8KlMwT3Ee8JhpkC7sa1WUs0Srgs+l3iBv6c0"}
]],
......@@ -340,14 +340,14 @@ func main() {
"state_key":"@richvdh:matrix.org",
"type":"m.room.member"
},
"StateBeforeRemovesEventIDs":["$1463671339126270PnVwC:matrix.org"],
"StateBeforeAddsEventIDs":null,
"LatestEventIDs":["$1463671339126270PnVwC:matrix.org"],
"AddsStateEventIDs":["$1463671337126266wrSBX:matrix.org", "$1463671339126270PnVwC:matrix.org"],
"RemovesStateEventIDs":null,
"LastSentEventID":"",
"SendAsServer":""
}`,
"state_before_removes_event_ids":["$1463671339126270PnVwC:matrix.org"],
"state_before_adds_event_ids":null,
"latest_event_ids":["$1463671339126270PnVwC:matrix.org"],
"adds_state_event_ids":["$1463671337126266wrSBX:matrix.org", "$1463671339126270PnVwC:matrix.org"],
"removes_state_event_ids":null,
"last_sent_event_id":"",
"send_as_server":""
}}`,
}
testRoomserver(input, want, func(q api.RoomserverQueryAPI) {
......
......@@ -98,15 +98,13 @@ func createTestUser(database, username, token string) error {
// trimmed to the client format and then canonicalised and returned as a string.
// Panics if there are any problems.
func clientEventJSONForOutputRoomEvent(outputRoomEvent string) string {
var out api.OutputRoomEvent
var out api.OutputEvent
if err := json.Unmarshal([]byte(outputRoomEvent), &out); err != nil {
panic("failed to unmarshal output room event: " + err.Error())
}
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(out.Event, false)
if err != nil {
panic("failed to convert event field in output room event to Event: " + err.Error())
}
clientEvs := gomatrixserverlib.ToClientEvents([]gomatrixserverlib.Event{ev}, gomatrixserverlib.FormatSync)
clientEvs := gomatrixserverlib.ToClientEvents([]gomatrixserverlib.Event{
out.NewRoomEvent.Event,
}, gomatrixserverlib.FormatSync)
b, err := json.Marshal(clientEvs[0])
if err != nil {
panic("failed to marshal client event as json: " + err.Error())
......
......@@ -72,31 +72,32 @@ func (s *OutputRoomEvent) Start() error {
// realises that it cannot update the room state using the deltas.
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputRoomEvent
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false)
if err != nil {
log.WithError(err).Errorf("roomserver output log: event parse failure")
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
ev := &output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"send_as_server": output.SendAsServer,
"send_as_server": output.NewRoomEvent.SendAsServer,
}).Info("received event from roomserver")
if err = s.processMessage(output, ev); err != nil {
if err := s.processMessage(*output.NewRoomEvent); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
"add": output.AddsStateEventIDs,
"del": output.RemovesStateEventIDs,
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
}).Panicf("roomserver output log: write event failure")
return nil
}
......@@ -106,8 +107,8 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixserverlib.Event) error {
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ev)
func (s *OutputRoomEvent) processMessage(ore api.OutputNewRoomEvent) error {
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event)
if err != nil {
return err
}
......@@ -121,7 +122,7 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixser
// TODO: handle EventIDMismatchError and recover the current state by talking
// to the roomserver
oldJoinedHosts, err := s.db.UpdateRoom(
ev.RoomID(), ore.LastSentEventID, ev.EventID(),
ore.Event.RoomID(), ore.LastSentEventID, ore.Event.EventID(),
addsJoinedHosts, ore.RemovesStateEventIDs,
)
if err != nil {
......@@ -134,14 +135,14 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixser
}
// Work out which hosts were joined at the event itself.
joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, ev, oldJoinedHosts)
joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, oldJoinedHosts)
if err != nil {
return err
}
// Send the event.
if err = s.queues.SendEvent(
&ev, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
&ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
); err != nil {
return err
}
......@@ -159,7 +160,7 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixser
// events from the room server.
// Returns an error if there was a problem talking to the room server.
func (s *OutputRoomEvent) joinedHostsAtEvent(
ore api.OutputRoomEvent, ev gomatrixserverlib.Event, oldJoinedHosts []types.JoinedHost,
ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost,
) ([]gomatrixserverlib.ServerName, error) {
// Combine the delta into a single delta so that the adds and removes can
// cancel each other out. This should reduce the number of times we need
......@@ -168,7 +169,7 @@ func (s *OutputRoomEvent) joinedHostsAtEvent(
ore.AddsStateEventIDs, ore.RemovesStateEventIDs,
ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs,
)
combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ev)
combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event)
if err != nil {
return nil, err
}
......
......@@ -15,10 +15,25 @@
package api
import (
"encoding/json"
"github.com/matrix-org/gomatrixserverlib"
)
// An OutputRoomEvent is written when the roomserver receives a new event.
// An OutputType is a type of roomserver output.
type OutputType string
// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
const OutputTypeNewRoomEvent OutputType = "new_room_event"
// An OutputEvent is an entry in the roomserver output kafka log.
// Consumers should check the type field when consuming this event.
type OutputEvent struct {
// What sort of event this is.
Type OutputType `json:"type"`
// The content of event with type OutputTypeNewRoomEvent
NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"`
}
// An OutputNewRoomEvent is written when the roomserver receives a new event.
// It contains the full matrix room event and enough information for a
// consumer to construct the current state of the room and the state before the
// event.
......@@ -27,19 +42,19 @@ import (
// after a list of events. The current state is the state after the latest
// event IDs in the room. The state before an event is the state after its
// prev_events.
type OutputRoomEvent struct {
// The JSON bytes of the event.
Event []byte
type OutputNewRoomEvent struct {
// The Event.
Event gomatrixserverlib.Event `json:"event"`
// The latest events in the room after this event.
// This can be used to set the prev events for new events in the room.
// This also can be used to get the full current state after this event.
LatestEventIDs []string
LatestEventIDs []string `json:"latest_event_ids"`
// The state event IDs that were added to the state of the room by this event.
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date
// view of the current state of the room.
AddsStateEventIDs []string
AddsStateEventIDs []string `json:"adds_state_event_ids"`
// The state event IDs that were removed from the state of the room by this event.
RemovesStateEventIDs []string
RemovesStateEventIDs []string `json:"removes_state_event_ids"`
// The ID of the event that was output before this event.
// Or the empty string if this is the first event output for this room.
// This is used by consumers to check if they can safely update their
......@@ -48,7 +63,7 @@ type OutputRoomEvent struct {
//
// If the LastSentEventID doesn't match what they were expecting it to be
// they can use the LatestEventIDs to request the full current state.
LastSentEventID string
LastSentEventID string `json:"last_sent_event_id"`
// The state event IDs that are part of the state at the event, but not
// part of the current state. Together with the StateBeforeRemovesEventIDs
// this can be used to construct the state before the event from the
......@@ -62,10 +77,10 @@ type OutputRoomEvent struct {
//
// The state is given as a delta against the current state because they are
// usually either the same state, or differ by just a couple of events.
StateBeforeAddsEventIDs []string
StateBeforeAddsEventIDs []string `json:"state_before_adds_event_ids"`
// The state event IDs that are part of the current state, but not part
// of the state at the event.
StateBeforeRemovesEventIDs []string
StateBeforeRemovesEventIDs []string `json:"state_before_removes_event_ids"`
// The server name to use to push this event to other servers.
// Or empty if this event shouldn't be pushed to other servers.
//
......@@ -81,66 +96,5 @@ type OutputRoomEvent struct {
//
// We encode the server name that the event should be sent using here to
// future proof the API for virtual hosting.
SendAsServer string
}
// UnmarshalJSON implements json.Unmarshaller
func (ore *OutputRoomEvent) UnmarshalJSON(data []byte) error {
// Create a struct rather than unmarshalling directly into the OutputRoomEvent
// so that we can use json.RawMessage.
// We use json.RawMessage so that the event JSON is sent as JSON rather than
// being base64 encoded which is the default for []byte.
var content struct {
Event *json.RawMessage
LatestEventIDs []string
AddsStateEventIDs []string
RemovesStateEventIDs []string
LastSentEventID string
StateBeforeAddsEventIDs []string
StateBeforeRemovesEventIDs []string
SendAsServer string
}
if err := json.Unmarshal(data, &content); err != nil {
return err
}
if content.Event != nil {
ore.Event = []byte(*content.Event)
}
ore.LatestEventIDs = content.LatestEventIDs
ore.AddsStateEventIDs = content.AddsStateEventIDs
ore.RemovesStateEventIDs = content.RemovesStateEventIDs
ore.LastSentEventID = content.LastSentEventID
ore.StateBeforeAddsEventIDs = content.StateBeforeAddsEventIDs
ore.StateBeforeRemovesEventIDs = content.StateBeforeRemovesEventIDs
ore.SendAsServer = content.SendAsServer
return nil
}
// MarshalJSON implements json.Marshaller
func (ore OutputRoomEvent) MarshalJSON() ([]byte, error) {
// Create a struct rather than marshalling directly from the OutputRoomEvent
// so that we can use json.RawMessage.
// We use json.RawMessage so that the event JSON is sent as JSON rather than
// being base64 encoded which is the default for []byte.
event := json.RawMessage(ore.Event)
content := struct {
Event *json.RawMessage
LatestEventIDs []string
AddsStateEventIDs []string
RemovesStateEventIDs []string
LastSentEventID string
StateBeforeAddsEventIDs []string
StateBeforeRemovesEventIDs []string
SendAsServer string
}{
Event: &event,
LatestEventIDs: ore.LatestEventIDs,
AddsStateEventIDs: ore.AddsStateEventIDs,
RemovesStateEventIDs: ore.RemovesStateEventIDs,
LastSentEventID: ore.LastSentEventID,
StateBeforeAddsEventIDs: ore.StateBeforeAddsEventIDs,
StateBeforeRemovesEventIDs: ore.StateBeforeRemovesEventIDs,
SendAsServer: ore.SendAsServer,
}
return json.Marshal(&content)
SendAsServer string `json:"send_as_server"`
}
......@@ -63,9 +63,13 @@ type Consumer struct {
}
// WriteOutputRoomEvent implements OutputRoomEventWriter
func (c *Consumer) WriteOutputRoomEvent(output api.OutputRoomEvent) error {
func (c *Consumer) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error {
var m sarama.ProducerMessage
value, err := json.Marshal(output)
oe := api.OutputEvent{
Type: api.OutputTypeNewRoomEvent,
NewRoomEvent: &output,
}
value, err := json.Marshal(oe)
if err != nil {
return err
}
......
......@@ -44,7 +44,7 @@ type RoomEventDatabase interface {
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
type OutputRoomEventWriter interface {
// Write an event.
WriteOutputRoomEvent(output api.OutputRoomEvent) error
WriteOutputRoomEvent(output api.OutputNewRoomEvent) error
}
func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error {
......
......@@ -16,6 +16,7 @@ package input
import (
"bytes"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/types"
......@@ -201,8 +202,8 @@ func writeEvent(
latestEventIDs[i] = latest[i].EventID
}
ore := api.OutputRoomEvent{
Event: event.JSON(),
ore := api.OutputNewRoomEvent{
Event: event,
LastSentEventID: lastEventIDSent,
LatestEventIDs: latestEventIDs,
}
......
......@@ -71,35 +71,38 @@ func (s *OutputRoomEvent) Start() error {
// sync stream position may race and be incorrectly calculated.
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputRoomEvent
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false)
if err != nil {
log.WithError(err).Errorf("roomserver output log: event parse failure")
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
}).Info("received event from roomserver")
addsStateEvents, err := s.lookupStateEvents(output.AddsStateEventIDs, ev)
addsStateEvents, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
if err != nil {
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
"add": output.AddsStateEventIDs,
"del": output.RemovesStateEventIDs,
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
}).Panicf("roomserver output log: state event lookup failure")
}
syncStreamPos, err := s.db.WriteEvent(
&ev, addsStateEvents, output.AddsStateEventIDs, output.RemovesStateEventIDs,
&ev, addsStateEvents, output.NewRoomEvent.AddsStateEventIDs, output.NewRoomEvent.RemovesStateEventIDs,
)
if err != nil {
......@@ -107,8 +110,8 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
"add": output.AddsStateEventIDs,
"del": output.RemovesStateEventIDs,
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
}).Panicf("roomserver output log: write event failure")
return nil
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册