diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go index 366541af5b47f1c2a4a330f7650dd3ceaea2c6ec..bfa4a9d581892abd73fea48941c9bf18e04746c2 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go @@ -23,6 +23,7 @@ const ( ) // InputRoomEvent is a matrix room event to add to the room server database. +// TODO: Implement UnmarshalJSON/MarshalJSON in a way that does something sensible with the event JSON. type InputRoomEvent struct { // Whether this event is new, backfilled or an outlier. // This controls how the event is processed. diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go index 6d2783f73eb3ff71aeb1818feddabb5306f84e0d..14a3ce15fb0edffff3c4546d15a48927cb6f0d6f 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go @@ -2,12 +2,15 @@ package input import ( + "encoding/json" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/types" sarama "gopkg.in/Shopify/sarama.v1" ) // A ConsumerDatabase has the storage APIs needed by the consumer. type ConsumerDatabase interface { + RoomEventDatabase // PartitionOffsets returns the offsets the consumer has reached for each partition. PartitionOffsets(topic string) ([]types.PartitionOffset, error) // SetPartitionOffset records where the consumer has reached for a partition. @@ -87,7 +90,21 @@ func (c *Consumer) Start() error { func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { defer pc.Close() for message := range pc.Messages() { - // TODO: Do stuff with message. + var input api.InputRoomEvent + if err := json.Unmarshal(message.Value, &input); err != nil { + // If the message is invalid then log it and move onto the next message in the stream. + c.logError(message, err) + } else { + if err := processRoomEvent(c.DB, input); err != nil { + // If there was an error processing the message then log it and + // move onto the next message in the stream. + // TODO: If the error was due to a problem talking to the database + // then we shouldn't move onto the next message and we should either + // retry processing the message, or panic and kill ourselves. + c.logError(message, err) + } + } + // Advance our position in the stream so that we will start at the right position after a restart. if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil { c.logError(message, err) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go new file mode 100644 index 0000000000000000000000000000000000000000..dfeff9536a2419b0d037f2082cd106590cf13ebe --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -0,0 +1,46 @@ +package input + +import ( + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" +) + +// A RoomEventDatabase has the storage APIs needed to store a room event. +type RoomEventDatabase interface { + StoreEvent(event gomatrixserverlib.Event) error +} + +func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error { + // Parse and validate the event JSON + event, err := gomatrixserverlib.NewEventFromUntrustedJSON(input.Event) + if err != nil { + return err + } + + if err := db.StoreEvent(event); err != nil { + return err + } + + // TODO: + // * Check that the event passes authentication checks. + + if input.Kind == api.KindOutlier { + // For outliers we can stop after we've stored the event itself as it + // doesn't have any associated state to store and we don't need to + // notify anyone about it. + return nil + } + + // TODO: + // * Calcuate the state at the event if necessary. + // * Store the state at the event. + // * Update the extremities of the event graph for the room + // * Caculate the new current state for the room if the forward extremities have changed. + // * Work out the delta between the new current state and the previous current state. + // * Work out the visibility of the event. + // * Write a message to the output logs containing: + // - The event itself + // - The visiblity of the event, i.e. who is allowed to see the event. + // - The changes to the current state of the room. + panic("Not implemented") +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go index 3e988f2138e1fb5004c2397bde97c0e573baf8b2..0a3d6b57333a7b4303473471deaa0a02c143cf8e 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -8,35 +8,70 @@ import ( type statements struct { selectPartitionOffsetsStmt *sql.Stmt upsertPartitionOffsetStmt *sql.Stmt + insertEventTypeNIDStmt *sql.Stmt + selectEventTypeNIDStmt *sql.Stmt + insertEventStateKeyNIDStmt *sql.Stmt + selectEventStateKeyNIDStmt *sql.Stmt + insertRoomNIDStmt *sql.Stmt + selectRoomNIDStmt *sql.Stmt + insertEventStmt *sql.Stmt + insertEventJSONStmt *sql.Stmt } func (s *statements) prepare(db *sql.DB) error { var err error - _, err = db.Exec(partitionOffsetsSchema) - if err != nil { + if err = s.preparePartitionOffsets(db); err != nil { return err } - if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil { + if err = s.prepareEventTypes(db); err != nil { return err } - if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil { + + if err = s.prepareEventStateKeys(db); err != nil { + return err + } + + if err = s.prepareRooms(db); err != nil { + return err + } + + if err = s.prepareEvents(db); err != nil { return err } + + if err = s.prepareEventJSON(db); err != nil { + return err + } + return nil } +func (s *statements) preparePartitionOffsets(db *sql.DB) (err error) { + _, err = db.Exec(partitionOffsetsSchema) + if err != nil { + return + } + if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil { + return + } + if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil { + return + } + return +} + const partitionOffsetsSchema = ` -- The offsets that the server has processed up to. CREATE TABLE IF NOT EXISTS partition_offsets ( - -- The name of the topic. - topic TEXT NOT NULL, - -- The 32-bit partition ID - partition INTEGER NOT NULL, - -- The 64-bit offset. - partition_offset BIGINT NOT NULL, - CONSTRAINT topic_partition_unique UNIQUE (topic, partition) + -- The name of the topic. + topic TEXT NOT NULL, + -- The 32-bit partition ID + partition INTEGER NOT NULL, + -- The 64-bit offset. + partition_offset BIGINT NOT NULL, + CONSTRAINT topic_partition_unique UNIQUE (topic, partition) ); ` @@ -68,3 +103,276 @@ func (s *statements) upsertPartitionOffset(topic string, partition int32, offset _, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset) return err } + +func (s *statements) prepareEventTypes(db *sql.DB) (err error) { + _, err = db.Exec(eventTypesSchema) + if err != nil { + return + } + if s.insertEventTypeNIDStmt, err = db.Prepare(insertEventTypeNIDSQL); err != nil { + return + } + if s.selectEventTypeNIDStmt, err = db.Prepare(selectEventTypeNIDSQL); err != nil { + return + } + return +} + +const eventTypesSchema = ` +-- Numeric versions of the event "type"s. Event types tend to be taken from a +-- small common pool. Assigning each a numeric ID should reduce the amount of +-- data that needs to be stored and fetched from the database. +-- It also means that many operations can work with int64 arrays rather than +-- string arrays which may help reduce GC pressure. +-- Well known event types are pre-assigned numeric IDs: +-- 1 -> m.room.create +-- 2 -> m.room.power_levels +-- 3 -> m.room.join_rules +-- 4 -> m.room.third_party_invite +-- 5 -> m.room.member +-- 6 -> m.room.redaction +-- 7 -> m.room.history_visibility +-- Picking well-known numeric IDs for the events types that require special +-- attention during state conflict resolution means that we write that code +-- using numeric constants. +-- It also means that the numeric IDs for common event types should be +-- consistent between different instances which might make ad-hoc debugging +-- easier. +-- Other event types are automatically assigned numeric IDs starting from 2**16. +-- This leaves room to add more pre-assigned numeric IDs and clearly separates +-- the automatically assigned IDs from the pre-assigned IDs. +CREATE SEQUENCE IF NOT EXISTS event_type_nid_seq START 65536; +CREATE TABLE IF NOT EXISTS event_types ( + -- Local numeric ID for the event type. + event_type_nid BIGINT PRIMARY KEY DEFAULT nextval('event_type_nid_seq'), + -- The string event_type. + event_type TEXT NOT NULL CONSTRAINT event_type_unique UNIQUE +); +INSERT INTO event_types (event_type_nid, event_type) VALUES + (1, 'm.room.create'), + (2, 'm.room.power_levels'), + (3, 'm.room.join_rules'), + (4, 'm.room.third_party_invite'), + (5, 'm.room.member'), + (6, 'm.room.redaction'), + (7, 'm.room.history_visibility') ON CONFLICT DO NOTHING; +` + +// Assign a new numeric event type ID. +// The usual case is that the event type is not in the database. +// In that case the ID will be assigned using the next value from the sequence. +// We use `RETURNING` to tell postgres to return the assigned ID. +// But it's possible that the type was added in a query that raced with us. +// This will result in a conflict on the event_type_unique constraint. +// We peform a update that does nothing rather that doing nothing at all because +// postgres won't return anything unless we touch a row in the table. +const insertEventTypeNIDSQL = "" + + "INSERT INTO event_types (event_type) VALUES ($1)" + + " ON CONFLICT ON CONSTRAINT event_type_unique" + + " DO UPDATE SET event_type = $1" + + " RETURNING (event_type_nid)" + +const selectEventTypeNIDSQL = "" + + "SELECT event_type_nid FROM event_types WHERE event_type = $1" + +func (s *statements) insertEventTypeNID(eventType string) (eventTypeNID int64, err error) { + err = s.insertEventTypeNIDStmt.QueryRow(eventType).Scan(&eventTypeNID) + return +} + +func (s *statements) selectEventTypeNID(eventType string) (eventTypeNID int64, err error) { + err = s.selectEventTypeNIDStmt.QueryRow(eventType).Scan(&eventTypeNID) + return +} + +func (s *statements) prepareEventStateKeys(db *sql.DB) (err error) { + _, err = db.Exec(eventStateKeysSchema) + if err != nil { + return + } + if s.insertEventStateKeyNIDStmt, err = db.Prepare(insertEventStateKeyNIDSQL); err != nil { + return + } + if s.selectEventStateKeyNIDStmt, err = db.Prepare(selectEventStateKeyNIDSQL); err != nil { + return + } + return +} + +const eventStateKeysSchema = ` +-- Numeric versions of the event "state_key"s. State keys tend to be reused so +-- assigning each string a numeric ID should reduce the amount of data that +-- needs to be stored and fetched from the database. +-- It also means that many operations can work with int64 arrays rather than +-- string arrays which may help reduce GC pressure. +-- Well known state keys are pre-assigned numeric IDs: +-- 1 -> "" (the empty string) +-- Other state keys are automatically assigned numeric IDs starting from 2**16. +-- This leaves room to add more pre-assigned numeric IDs and clearly separates +-- the automatically assigned IDs from the pre-assigned IDs. +CREATE SEQUENCE IF NOT EXISTS event_state_key_nid_seq START 65536; +CREATE TABLE IF NOT EXISTS event_state_keys ( + -- Local numeric ID for the state key. + event_state_key_nid BIGINT PRIMARY KEY DEFAULT nextval('event_state_key_nid_seq'), + event_state_key TEXT NOT NULL CONSTRAINT event_state_key_unique UNIQUE +); +INSERT INTO event_state_keys (event_state_key_nid, event_state_key) VALUES + (1, '') ON CONFLICT DO NOTHING; +` + +// Same as insertEventTypeNIDSQL +const insertEventStateKeyNIDSQL = "" + + "INSERT INTO event_state_keys (event_state_key) VALUES ($1)" + + " ON CONFLICT ON CONSTRAINT event_state_key_unique" + + " DO UPDATE SET event_state_key = $1" + + " RETURNING (event_state_key_nid)" + +const selectEventStateKeyNIDSQL = "" + + "SELECT event_state_key_nid FROM event_state_keys WHERE event_state_key = $1" + +func (s *statements) insertEventStateKeyNID(eventStateKey string) (eventStateKeyNID int64, err error) { + err = s.insertEventStateKeyNIDStmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID) + return +} + +func (s *statements) selectEventStateKeyNID(eventStateKey string) (eventStateKeyNID int64, err error) { + err = s.selectEventStateKeyNIDStmt.QueryRow(eventStateKey).Scan(&eventStateKeyNID) + return +} + +func (s *statements) prepareRooms(db *sql.DB) (err error) { + _, err = db.Exec(roomsSchema) + if err != nil { + return + } + if s.insertRoomNIDStmt, err = db.Prepare(insertRoomNIDSQL); err != nil { + return + } + if s.selectRoomNIDStmt, err = db.Prepare(selectRoomNIDSQL); err != nil { + return + } + return +} + +const roomsSchema = ` +CREATE SEQUENCE IF NOT EXISTS room_nid_seq; +CREATE TABLE IF NOT EXISTS rooms ( + -- Local numeric ID for the room. + room_nid BIGINT PRIMARY KEY DEFAULT nextval('room_nid_seq'), + -- Textual ID for the room. + room_id TEXT NOT NULL CONSTRAINT room_id_unique UNIQUE +); +` + +// Same as insertEventTypeNIDSQL +const insertRoomNIDSQL = "" + + "INSERT INTO rooms (room_id) VALUES ($1)" + + " ON CONFLICT ON CONSTRAINT room_id_unique" + + " DO UPDATE SET room_id = $1" + + " RETURNING (room_nid)" + +const selectRoomNIDSQL = "" + + "SELECT room_nid FROM rooms WHERE room_id = $1" + +func (s *statements) insertRoomNID(roomID string) (roomNID int64, err error) { + err = s.insertRoomNIDStmt.QueryRow(roomID).Scan(&roomNID) + return +} + +func (s *statements) selectRoomNID(roomID string) (roomNID int64, err error) { + err = s.selectRoomNIDStmt.QueryRow(roomID).Scan(&roomNID) + return +} + +const eventsSchema = ` +-- The events table holds metadata for each event, the actual JSON is stored +-- separately to keep the size of the rows small. +CREATE SEQUENCE IF NOT EXISTS event_nid_seq; +CREATE TABLE IF NOT EXISTS events ( + -- Local numeric ID for the event. + event_nid BIGINT PRIMARY KEY DEFAULT nextval('event_nid_seq'), + -- Local numeric ID for the room the event is in. + -- This is never 0. + room_nid BIGINT NOT NULL, + -- Local numeric ID for the type of the event. + -- This is never 0. + event_type_nid BIGINT NOT NULL, + -- Local numeric ID for the state_key of the event + -- This is 0 if the event is not a state event. + event_state_key_nid BIGINT NOT NULL, + -- The textual event id. + -- Used to lookup the numeric ID when processing requests. + -- Needed for state resolution. + -- An event may only appear in this table once. + event_id TEXT NOT NULL CONSTRAINT event_id_unique UNIQUE, + -- The sha256 reference hash for the event. + -- Needed for setting reference hashes when sending new events. + reference_sha256 BYTEA NOT NULL +); +` + +const insertEventSQL = "" + + "INSERT INTO events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256)" + + " VALUES ($1, $2, $3, $4, $5)" + + " ON CONFLICT ON CONSTRAINT event_id_unique" + + " DO UPDATE SET event_id = $1" + + " RETURNING event_nid" + +func (s *statements) prepareEvents(db *sql.DB) (err error) { + _, err = db.Exec(eventsSchema) + if err != nil { + return + } + if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { + return + } + return +} + +func (s *statements) insertEvent( + roomNID, eventTypeNID, eventStateKeyNID int64, + eventID string, + referenceSHA256 []byte, +) (eventNID int64, err error) { + err = s.insertEventStmt.QueryRow( + roomNID, eventTypeNID, eventStateKeyNID, eventID, referenceSHA256, + ).Scan(&eventNID) + return +} + +func (s *statements) prepareEventJSON(db *sql.DB) (err error) { + _, err = db.Exec(eventJSONSchema) + if err != nil { + return + } + if s.insertEventJSONStmt, err = db.Prepare(insertEventJSONSQL); err != nil { + return + } + return +} + +const eventJSONSchema = ` +-- Stores the JSON for each event. This kept separate from the main events +-- table to keep the rows in the main events table small. +CREATE TABLE IF NOT EXISTS event_json ( + -- Local numeric ID for the event. + event_nid BIGINT NOT NULL PRIMARY KEY, + -- The JSON for the event. + -- Stored as TEXT because this should be valid UTF-8. + -- Not stored as a JSONB because we always just pull the entire event + -- so there is no point in postgres parsing it. + -- Not stored as JSON because we already validate the JSON in the server + -- so there is no point in postgres validating it. + -- TODO: Should we be compressing the events with Snappy or DEFLATE? + event_json TEXT NOT NULL +); +` + +const insertEventJSONSQL = "" + + "INSERT INTO event_json (event_nid, event_json) VALUES ($1, $2)" + + " ON CONFLICT DO NOTHING" + +func (s *statements) insertEventJSON(eventNID int64, eventJSON []byte) error { + _, err := s.insertEventJSONStmt.Exec(eventNID, eventJSON) + return err +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 2b162a81fd8ab8b6aa41e175646d8286be6bb55e..1d6d7a327fc177d8c3b33aeebedeb6885a80a793 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -5,6 +5,7 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" ) // A Database is used to store room events and stream offsets. @@ -35,3 +36,82 @@ func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, erro func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { return d.statements.upsertPartitionOffset(topic, partition, offset) } + +// StoreEvent implements input.EventDatabase +func (d *Database) StoreEvent(event gomatrixserverlib.Event) error { + var ( + roomNID int64 + eventTypeNID int64 + eventStateKeyNID int64 + eventNID int64 + err error + ) + + if roomNID, err = d.assignRoomNID(event.RoomID()); err != nil { + return err + } + + if eventTypeNID, err = d.assignEventTypeNID(event.Type()); err != nil { + return err + } + + eventStateKey := event.StateKey() + // Assigned a numeric ID for the state_key if there is one present. + // Otherwise set the numeric ID for the state_key to 0. + if eventStateKey != nil { + if eventStateKeyNID, err = d.assignStateKeyNID(*eventStateKey); err != nil { + return err + } + } + + if eventNID, err = d.statements.insertEvent( + roomNID, + eventTypeNID, + eventStateKeyNID, + event.EventID(), + event.EventReference().EventSHA256, + ); err != nil { + return err + } + + return d.statements.insertEventJSON(eventNID, event.JSON()) +} + +func (d *Database) assignRoomNID(roomID string) (int64, error) { + // Check if we already have a numeric ID in the database. + roomNID, err := d.statements.selectRoomNID(roomID) + if err == sql.ErrNoRows { + // We don't have a numeric ID so insert one into the database. + return d.statements.insertRoomNID(roomID) + } + if err != nil { + return 0, err + } + return roomNID, nil +} + +func (d *Database) assignEventTypeNID(eventType string) (int64, error) { + // Check if we already have a numeric ID in the database. + eventTypeNID, err := d.statements.selectEventTypeNID(eventType) + if err == sql.ErrNoRows { + // We don't have a numeric ID so insert one into the database. + return d.statements.insertEventTypeNID(eventType) + } + if err != nil { + return 0, err + } + return eventTypeNID, nil +} + +func (d *Database) assignStateKeyNID(eventStateKey string) (int64, error) { + // Check if we already have a numeric ID in the database. + eventStateKeyNID, err := d.statements.selectEventStateKeyNID(eventStateKey) + if err == sql.ErrNoRows { + // We don't have a numeric ID so insert one into the database. + return d.statements.insertEventStateKeyNID(eventStateKey) + } + if err != nil { + return 0, err + } + return eventStateKeyNID, nil +}