diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go index 4a23d697741d12292eb924f32911b3f88d8479b1..72aaf3551c2590628c27faeff537707112726b10 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go @@ -18,14 +18,20 @@ import ( "context" "database/sql" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/types" ) const accountDataSchema = ` --- Stores the users account data +-- This sequence is shared between all the tables generated from kafka logs. +CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; + +-- Stores the types of account data that a user set has globally and in each room +-- and the stream ID when that type was last updated. CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( - -- The highest numeric ID from the output_room_events at the time of saving the data - id BIGINT, + -- An incrementing ID which denotes the position in the log that this event resides at. + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), -- ID of the user the data belongs to user_id TEXT NOT NULL, -- ID of the room the data is related to (empty string if not related to a specific room) @@ -33,8 +39,6 @@ CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( -- Type of the data type TEXT NOT NULL, - PRIMARY KEY(user_id, room_id, type), - -- We don't want two entries of the same type for the same user CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type) ); @@ -43,18 +47,23 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account ` const insertAccountDataSQL = "" + - "INSERT INTO syncapi_account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" + + "INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" + " ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" + - " DO UPDATE SET id = EXCLUDED.id" + " DO UPDATE SET id = EXCLUDED.id" + + " RETURNING id" const selectAccountDataInRangeSQL = "" + "SELECT room_id, type FROM syncapi_account_data_type" + " WHERE user_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id ASC" +const selectMaxAccountDataIDSQL = "" + + "SELECT MAX(id) FROM syncapi_account_data_type" + type accountDataStatements struct { insertAccountDataStmt *sql.Stmt selectAccountDataInRangeStmt *sql.Stmt + selectMaxAccountDataIDStmt *sql.Stmt } func (s *accountDataStatements) prepare(db *sql.DB) (err error) { @@ -68,15 +77,17 @@ func (s *accountDataStatements) prepare(db *sql.DB) (err error) { if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { return } + if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil { + return + } return } func (s *accountDataStatements) insertAccountData( ctx context.Context, - pos types.StreamPosition, userID, roomID, dataType string, -) (err error) { - _, err = s.insertAccountDataStmt.ExecContext(ctx, pos, userID, roomID, dataType) +) (pos int64, err error) { + s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos) return } @@ -116,3 +127,15 @@ func (s *accountDataStatements) selectAccountDataInRange( return } + +func (s *accountDataStatements) selectMaxAccountDataID( + ctx context.Context, txn *sql.Tx, +) (id int64, err error) { + var nullableID sql.NullInt64 + stmt := common.TxStmt(txn, s.selectMaxAccountDataIDStmt) + err = stmt.QueryRowContext(ctx).Scan(&nullableID) + if nullableID.Valid { + id = nullableID.Int64 + } + return +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 7ae24990acb633ae5995fafe06a11fa8db0f11b4..660a3074dfd22d4237542b7f0c56e7b9bf71109d 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -26,12 +26,15 @@ import ( ) const outputRoomEventsSchema = ` +-- This sequence is shared between all the tables generated from kafka logs. +CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; + -- Stores output room events received from the roomserver. CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- An incrementing ID which denotes the position in the log that this event resides at. -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. -- This isn't a problem for us since we just want to order by this field. - id BIGSERIAL PRIMARY KEY, + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), -- The event ID for the event event_id TEXT NOT NULL, -- The 'room_id' key for the event. @@ -60,7 +63,7 @@ const selectRecentEventsSQL = "" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC LIMIT $4" -const selectMaxIDSQL = "" + +const selectMaxEventIDSQL = "" + "SELECT MAX(id) FROM syncapi_output_room_events" // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). @@ -73,7 +76,7 @@ const selectStateInRangeSQL = "" + type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt - selectMaxIDStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt selectRecentEventsStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt } @@ -89,7 +92,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { return } - if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil { + if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { return } if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { @@ -170,11 +173,11 @@ func (s *outputRoomEventsStatements) selectStateInRange( // MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied, // then this function should only ever be used at startup, as it will race with inserting events if it is // done afterwards. If there are no inserted events, 0 is returned. -func (s *outputRoomEventsStatements) selectMaxID( +func (s *outputRoomEventsStatements) selectMaxEventID( ctx context.Context, txn *sql.Tx, ) (id int64, err error) { var nullableID sql.NullInt64 - stmt := common.TxStmt(txn, s.selectMaxIDStmt) + stmt := common.TxStmt(txn, s.selectMaxEventIDStmt) err = stmt.QueryRowContext(ctx).Scan(&nullableID) if nullableID.Valid { id = nullableID.Int64 diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 6594b938c8c5075f7ca3dfe8b565bedca9a2019d..a26b14c0e1bd5cd8396dc7c4591eb0a2380f85f1 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -174,11 +174,24 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { - id, err := d.events.selectMaxID(ctx, nil) + return d.syncStreamPositionTx(ctx, nil) +} + +func (d *SyncServerDatabase) syncStreamPositionTx( + ctx context.Context, txn *sql.Tx, +) (types.StreamPosition, error) { + maxID, err := d.events.selectMaxEventID(ctx, txn) + if err != nil { + return 0, err + } + maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) if err != nil { - return types.StreamPosition(0), err + return 0, err + } + if maxAccountDataID > maxID { + maxID = maxAccountDataID } - return types.StreamPosition(id), nil + return types.StreamPosition(maxID), nil } // IncrementalSync returns all the data needed in order to create an incremental sync response. @@ -271,11 +284,10 @@ func (d *SyncServerDatabase) CompleteSync( defer common.EndTransaction(txn, &succeeded) // Get the current stream position which we will base the sync response on. - id, err := d.events.selectMaxID(ctx, txn) + pos, err := d.syncStreamPositionTx(ctx, txn) if err != nil { return nil, err } - pos := types.StreamPosition(id) // Extract room state and recent events for all rooms the user is joined to. roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") @@ -348,13 +360,8 @@ func (d *SyncServerDatabase) GetAccountDataInRange( func (d *SyncServerDatabase) UpsertAccountData( ctx context.Context, userID, roomID, dataType string, ) (types.StreamPosition, error) { - pos, err := d.SyncStreamPosition(ctx) - if err != nil { - return pos, err - } - - err = d.accountData.insertAccountData(ctx, pos, userID, roomID, dataType) - return pos, err + pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType) + return types.StreamPosition(pos), err } func (d *SyncServerDatabase) addInvitesToResponse(