latest_events.go 5.5 KB
Newer Older
1 2 3 4
package input

import (
	"bytes"
5
	"github.com/matrix-org/dendrite/roomserver/api"
6 7 8 9
	"github.com/matrix-org/dendrite/roomserver/types"
	"github.com/matrix-org/gomatrixserverlib"
)

10 11
// updateLatestEvents updates the list of latest events for this room in the database and writes the
// event to the output log.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
// The latest events are the events that aren't referenced by another event in the database:
//
//     Time goes down the page. 1 is the m.room.create event (root).
//
//        1                 After storing 1 the latest events are {1}
//        |                 After storing 2 the latest events are {2}
//        2                 After storing 3 the latest events are {3}
//       / \                After storing 4 the latest events are {3,4}
//      3   4               After storing 5 the latest events are {5,4}
//      |   |               After storing 6 the latest events are {5,6}
//      5   6 <--- latest   After storing 7 the latest events are {6,7}
//      |
//      7 <----- latest
//
func updateLatestEvents(
27
	db RoomEventDatabase, ow OutputRoomEventWriter, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
28
) (err error) {
29
	oldLatest, lastEventIDSent, updater, err := db.GetLatestEventsForUpdate(roomNID)
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
	if err != nil {
		return
	}
	defer func() {
		if err == nil {
			// Commit if there wasn't an error.
			// Set the returned err value if we encounter an error committing.
			// This only works because err is a named return.
			err = updater.Commit()
		} else {
			// Ignore any error we get rolling back since we don't want to
			// clobber the current error
			// TODO: log the error here.
			updater.Rollback()
		}
	}()

47
	err = doUpdateLatestEvents(updater, ow, oldLatest, lastEventIDSent, roomNID, stateAtEvent, event)
48 49 50 51
	return
}

func doUpdateLatestEvents(
52
	updater types.RoomRecentEventsUpdater, ow OutputRoomEventWriter, oldLatest []types.StateAtEventAndReference, lastEventIDSent string, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
53 54 55 56 57
) error {
	var err error
	var prevEvents []gomatrixserverlib.EventReference
	prevEvents = event.PrevEvents()

58 59 60 61 62 63 64
	if hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID); err != nil {
		return err
	} else if hasBeenSent {
		// Already sent this event so we can stop processing
		return nil
	}

65 66 67 68
	if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil {
		return err
	}

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
	eventReference := event.EventReference()
	// Check if this event is already referenced by another event in the room.
	var alreadyReferenced bool
	if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil {
		return err
	}

	newLatest := calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
		EventReference: eventReference,
		StateAtEvent:   stateAtEvent,
	})

	// Send the event to the output logs.
	// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
	// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
	//  the write to the output log succeeds)
	// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
	// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
	// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
	// necessary bookkeeping we'll keep the event sending synchronous for now.
	if err = writeEvent(ow, lastEventIDSent, event, newLatest); err != nil {
		return err
	}

	if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID); err != nil {
		return err
	}

	if err = updater.MarkEventAsSent(stateAtEvent.EventNID); err != nil {
		return err
	}

	return nil
}

func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference {
105
	var alreadyInLatest bool
106 107
	var newLatest []types.StateAtEventAndReference
	for _, l := range oldLatest {
108
		keep := true
109 110 111 112 113
		for _, prevEvent := range prevEvents {
			if l.EventID == prevEvent.EventID && bytes.Compare(l.EventSHA256, prevEvent.EventSHA256) == 0 {
				// This event can be removed from the latest events cause we've found an event that references it.
				// (If an event is referenced by another event then it can't be one of the latest events in the room
				//  because we have an event that comes after it)
114 115
				keep = false
				break
116
			}
117
		}
118
		if l.EventNID == newEvent.EventNID {
119 120 121
			alreadyInLatest = true
		}
		if keep {
122 123 124 125 126
			// Keep the event in the latest events.
			newLatest = append(newLatest, l)
		}
	}

127 128 129
	if !alreadyReferenced && !alreadyInLatest {
		// This event is not referenced by any of the events in the room
		// and the event is not already in the latest events.
130
		// Add it to the latest events
131
		newLatest = append(newLatest, newEvent)
132 133
	}

134 135 136 137 138 139 140 141
	return newLatest
}

func writeEvent(ow OutputRoomEventWriter, lastEventIDSent string, event gomatrixserverlib.Event, latest []types.StateAtEventAndReference) error {

	latestEventIDs := make([]string, len(latest))
	for i := range latest {
		latestEventIDs[i] = latest[i].EventID
142 143
	}

144 145 146 147 148 149 150
	// TODO: Fill out AddsStateEventIDs and RemovesStateEventIDs
	// TODO: Fill out VisibilityStateIDs
	return ow.WriteOutputRoomEvent(api.OutputRoomEvent{
		Event:           event.JSON(),
		LastSentEventID: lastEventIDSent,
		LatestEventIDs:  latestEventIDs,
	})
151
}