latest_events.go 8.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15 16 17 18
package input

import (
	"bytes"
19

20
	"github.com/matrix-org/dendrite/roomserver/api"
21
	"github.com/matrix-org/dendrite/roomserver/state"
22 23
	"github.com/matrix-org/dendrite/roomserver/types"
	"github.com/matrix-org/gomatrixserverlib"
24
	"github.com/matrix-org/util"
25 26
)

27 28
// updateLatestEvents updates the list of latest events for this room in the database and writes the
// event to the output log.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
// The latest events are the events that aren't referenced by another event in the database:
//
//     Time goes down the page. 1 is the m.room.create event (root).
//
//        1                 After storing 1 the latest events are {1}
//        |                 After storing 2 the latest events are {2}
//        2                 After storing 3 the latest events are {3}
//       / \                After storing 4 the latest events are {3,4}
//      3   4               After storing 5 the latest events are {5,4}
//      |   |               After storing 6 the latest events are {5,6}
//      5   6 <--- latest   After storing 7 the latest events are {6,7}
//      |
//      7 <----- latest
//
func updateLatestEvents(
44 45 46 47 48 49
	db RoomEventDatabase,
	ow OutputRoomEventWriter,
	roomNID types.RoomNID,
	stateAtEvent types.StateAtEvent,
	event gomatrixserverlib.Event,
	sendAsServer string,
50
) (err error) {
51
	updater, err := db.GetLatestEventsForUpdate(roomNID)
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
	if err != nil {
		return
	}
	defer func() {
		if err == nil {
			// Commit if there wasn't an error.
			// Set the returned err value if we encounter an error committing.
			// This only works because err is a named return.
			err = updater.Commit()
		} else {
			// Ignore any error we get rolling back since we don't want to
			// clobber the current error
			// TODO: log the error here.
			updater.Rollback()
		}
	}()

69
	err = doUpdateLatestEvents(db, updater, ow, roomNID, stateAtEvent, event, sendAsServer)
70 71 72 73
	return
}

func doUpdateLatestEvents(
74 75 76 77 78 79 80
	db RoomEventDatabase,
	updater types.RoomRecentEventsUpdater,
	ow OutputRoomEventWriter,
	roomNID types.RoomNID,
	stateAtEvent types.StateAtEvent,
	event gomatrixserverlib.Event,
	sendAsServer string,
81 82 83 84
) error {
	var err error
	var prevEvents []gomatrixserverlib.EventReference
	prevEvents = event.PrevEvents()
85 86 87
	oldLatest := updater.LatestEvents()
	lastEventIDSent := updater.LastEventIDSent()
	oldStateNID := updater.CurrentStateSnapshotNID()
88

89 90 91 92 93 94 95
	if hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID); err != nil {
		return err
	} else if hasBeenSent {
		// Already sent this event so we can stop processing
		return nil
	}

96 97 98 99
	if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil {
		return err
	}

100 101 102 103 104 105 106 107 108 109 110 111
	eventReference := event.EventReference()
	// Check if this event is already referenced by another event in the room.
	var alreadyReferenced bool
	if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil {
		return err
	}

	newLatest := calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
		EventReference: eventReference,
		StateAtEvent:   stateAtEvent,
	})

112 113 114 115
	latestStateAtEvents := make([]types.StateAtEvent, len(newLatest))
	for i := range newLatest {
		latestStateAtEvents[i] = newLatest[i].StateAtEvent
	}
116
	newStateNID, err := state.CalculateAndStoreStateAfterEvents(db, roomNID, latestStateAtEvents)
117 118 119 120
	if err != nil {
		return err
	}

121
	removed, added, err := state.DifferenceBetweeenStateSnapshots(db, oldStateNID, newStateNID)
122 123 124 125
	if err != nil {
		return err
	}

126 127 128 129 130 131 132
	stateBeforeEventRemoves, stateBeforeEventAdds, err := state.DifferenceBetweeenStateSnapshots(
		db, newStateNID, stateAtEvent.BeforeStateSnapshotNID,
	)
	if err != nil {
		return err
	}

133 134 135 136 137 138 139 140
	// Send the event to the output logs.
	// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
	// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
	//  the write to the output log succeeds)
	// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
	// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
	// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
	// necessary bookkeeping we'll keep the event sending synchronous for now.
141 142
	if err = writeEvent(
		db, ow, lastEventIDSent, event, newLatest, removed, added,
143
		stateBeforeEventRemoves, stateBeforeEventAdds, sendAsServer,
144
	); err != nil {
145 146 147
		return err
	}

148
	if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID, newStateNID); err != nil {
149 150 151 152 153 154 155 156 157 158 159
		return err
	}

	if err = updater.MarkEventAsSent(stateAtEvent.EventNID); err != nil {
		return err
	}

	return nil
}

func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference {
160
	var alreadyInLatest bool
161 162
	var newLatest []types.StateAtEventAndReference
	for _, l := range oldLatest {
163
		keep := true
164 165 166 167 168
		for _, prevEvent := range prevEvents {
			if l.EventID == prevEvent.EventID && bytes.Compare(l.EventSHA256, prevEvent.EventSHA256) == 0 {
				// This event can be removed from the latest events cause we've found an event that references it.
				// (If an event is referenced by another event then it can't be one of the latest events in the room
				//  because we have an event that comes after it)
169 170
				keep = false
				break
171
			}
172
		}
173
		if l.EventNID == newEvent.EventNID {
174 175 176
			alreadyInLatest = true
		}
		if keep {
177 178 179 180 181
			// Keep the event in the latest events.
			newLatest = append(newLatest, l)
		}
	}

182 183 184
	if !alreadyReferenced && !alreadyInLatest {
		// This event is not referenced by any of the events in the room
		// and the event is not already in the latest events.
185
		// Add it to the latest events
186
		newLatest = append(newLatest, newEvent)
187 188
	}

189 190 191
	return newLatest
}

192 193 194 195
func writeEvent(
	db RoomEventDatabase, ow OutputRoomEventWriter, lastEventIDSent string,
	event gomatrixserverlib.Event, latest []types.StateAtEventAndReference,
	removed, added []types.StateEntry,
196
	stateBeforeEventRemoves, stateBeforeEventAdds []types.StateEntry,
197
	sendAsServer string,
198
) error {
199 200 201 202

	latestEventIDs := make([]string, len(latest))
	for i := range latest {
		latestEventIDs[i] = latest[i].EventID
203 204
	}

205 206
	ore := api.OutputNewRoomEvent{
		Event:           event,
207 208
		LastSentEventID: lastEventIDSent,
		LatestEventIDs:  latestEventIDs,
209 210 211 212 213 214 215 216 217
	}

	var stateEventNIDs []types.EventNID
	for _, entry := range added {
		stateEventNIDs = append(stateEventNIDs, entry.EventNID)
	}
	for _, entry := range removed {
		stateEventNIDs = append(stateEventNIDs, entry.EventNID)
	}
218 219 220 221 222 223 224
	for _, entry := range stateBeforeEventRemoves {
		stateEventNIDs = append(stateEventNIDs, entry.EventNID)
	}
	for _, entry := range stateBeforeEventAdds {
		stateEventNIDs = append(stateEventNIDs, entry.EventNID)
	}
	stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
225 226 227 228 229 230 231 232 233 234
	eventIDMap, err := db.EventIDs(stateEventNIDs)
	if err != nil {
		return err
	}
	for _, entry := range added {
		ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
	}
	for _, entry := range removed {
		ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID])
	}
235 236 237 238 239 240
	for _, entry := range stateBeforeEventRemoves {
		ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID])
	}
	for _, entry := range stateBeforeEventAdds {
		ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
	}
241
	ore.SendAsServer = sendAsServer
242
	return ow.WriteOutputRoomEvent(ore)
243
}
244 245 246 247 248 249

type eventNIDSorter []types.EventNID

func (s eventNIDSorter) Len() int           { return len(s) }
func (s eventNIDSorter) Less(i, j int) bool { return s[i] < s[j] }
func (s eventNIDSorter) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }