提交 bde8bc21 编写于 作者: T tanggen

init Ligase

上级 139fae98

要显示的变更太多。

To preserve performance only 1000 of 1000+ files are displayed.
root = true
[*]
charset = utf-8
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
[*.go]
indent_style = tab
indent_size = 4
[*.md]
trim_trailing_whitespace = false
[*.{yml,yaml}]
indent_style = space
---
name: "\U0001F41B Bug Report"
about: As a User, I want to report a Bug.
title: ''
labels: bug
assignees: ''
---
## Bug Report
Please answer these questions before submitting your issue. Thanks!
### 1. Minimal reproduce step (Required)
<!-- a step by step guide for reproducing the bug. -->
### 2. What did you expect to see? (Required)
### 3. What did you see instead (Required)
### 4. Affected version (Required)
<!-- v3.0.0, v4.0.0, etc -->
### 5. Root Cause Analysis
<!-- should be filled by the investigator before it's closed -->
---
name: "\U0001F680 Development Task"
about: As a Ligase developer, I want to record a development task.
title: ''
labels: enhancement
assignees: ''
---
## Development Task
---
name: "\U0001F680 Feature Request"
about: As a user, I want to request a New Feature on the product.
title: ''
labels: enhancement
assignees: ''
---
## Feature Request
**Is your feature request related to a problem? Please describe:**
<!-- A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] -->
**Describe the feature you'd like:**
<!-- A clear and concise description of what you want to happen. -->
**Describe alternatives you've considered:**
<!-- A clear and concise description of any alternative solutions or features you've considered. -->
**Teachability, Documentation, Adoption, Migration Strategy:**
<!-- If you can, explain some scenarios how users might use this, situations it would be helpful in. Any API designs, mockups, or diagrams are also helpful. -->
---
name: "\U0001F914 Ask a Question"
about: I want to ask a question.
labels: question
---
## Question
### Pull Request Checklist
<!-- Please read CONTRIBUTING.md before submitting your pull request -->
* [ ] Pull request is based on the develop branch
<!-- Thank you for contributing to Ligase!
PR Title Format:
1. *: what's changed
-->
### What problem does this PR solve?
Issue Number: close #xxx <!-- REMOVE this line if no issue to close -->
Problem Summary:
### What is changed and how it works?
What's Changed:
How it Works:
### Check List <!--REMOVE the items that are not applicable-->
Tests <!-- At least one of them must be included. -->
- Unit test
- Integration test
- Manual test (add detailed scripts or steps below)
- No code
Side effects
- Performance regression
- Consumes more CPU
- Consumes more MEM
- Breaking backward compatibility
### Release note <!-- bugfixes or new feature need a release note -->
.*.swp
# Hidden files
.*
# Downloads
kafka.tgz
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Folders
/kafka
/bin
/pkg
/_obj
/_test
/vendor/bin
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
# Test binary, built with `go test -c`
*.test
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
_testmain.go
# Dependency directories (remove the comment below to include it)
# vendor/
*.exe
*.test
*.prof
bin
.vscode
log
.DS_Store
language: go
go:
- 1.8
- 1.9
env:
- TEST_SUITE="lint"
- TEST_SUITE="unit-test"
- TEST_SUITE="integ-test"
sudo: false
# Use trusty for postgres 9.5 support
dist: trusty
addons:
postgresql: "9.5"
services:
- postgresql
install:
- go get github.com/constabulary/gb/...
script:
- ./scripts/travis-test.sh
notifications:
webhooks:
urls:
- "https://scalar.vector.im/api/neb/services/hooks/dHJhdmlzLWNpLyU0MGtlZ2FuJTNBbWF0cml4Lm9yZy8lMjFhWmthbkFuV0VkeGNSSVFrV24lM0FtYXRyaXgub3Jn"
on_success: change # always|never|change
on_failure: always
on_start: never
# Code Style
We follow the standard Go style using gofmt, but with a few extra
considerations.
## Linters
We use `gometalinter` to run a number of linters, the exact list can be found
in [linter.json](linter.json). Some of these are slow and expensive to run, but
a subset can be found in [linter-fast.json](linter-fast.json) that run quickly
enough to be run as part of an IDE.
For rare cases where a linter is giving a spurious warning, it can be disabled
for that line or statement using a [comment directive](https://github.com/alecthomas/gometalinter#comment-directives), e.g.
`// nolint: gocyclo`. This should be used sparingly and only when its clear
that the lint warning is spurious.
The linters are vendored, and can be run using [scripts/find-lint.sh](scripts/find-lint.sh)
(see file for docs) or as part of a build/test/lint cycle using
[scripts/build-test-lint.sh](scripts/build-test-lint.sh).
## HTTP Error Handling
Unfortunately, converting errors into HTTP responses with the correct status
code and message can be done in a number of ways in golang:
1. Having functions return `JSONResponse` directly, which can then either set
it to an error response or a `200 OK`.
2. Have the HTTP handler try and cast error values to types that are handled
differently.
3. Have the HTTP handler call functions whose errors can only be interpreted
one way, for example if a `validate(...)` call returns an error then handler
knows to respond with a `400 Bad Request`.
We attempt to always use option #3, as it more naturally fits with the way that
golang generally does error handling. In particular, option #1 effectively
requires reinventing a new error handling scheme just for HTTP handlers.
## Line length
We strive for a line length of roughly 80 characters, though less than 100 is
acceptable if necessary. Longer lines are fine if there is nothing of interest
after the first 80-100 characters (e.g. long string literals).
## TODOs and FIXMEs
The majority of TODOs and FIXMEs should have an associated tracking issue on
github. These can be added just before merging of the PR to master, and the
issue number should be added to the comment, e.g. `// TODO(#324): ...`
## Visual Studio Code
If you use VSCode then the following is an example of a workspace setting that
sets up linting correctly:
```json
{
"go.gopath": "${workspaceRoot}:${workspaceRoot}/vendor",
"go.lintOnSave": "workspace",
"go.lintTool": "gometalinter",
"go.lintFlags": ["--config=linter-fast.json", "--concurrency=5"]
}
```
# Contributing to Dendrite
# Overview
Everyone is welcome to contribute to Dendrite! We aim to make it as easy as
possible to get started.
Ligase is a community driven open source project and we welcome any contributor!
Please ensure that you sign off your contributions! See [Sign Off](#sign-off)
section below.
If you think something should be changed or added, please open an issue to discuss the change. You can open a PR if you want to be explicit about the change, but the change may need extensive discussion and possibly revision before it is accepted.
## Getting up and running
Feedback is welcome, feel free to open issue for any problem
See [INSTALL.md](INSTALL.md) for instructions on setting up a running dev
instance of dendrite, and [CODE_STYLE.md](CODE_STYLE.md) for the code style
guide.
# Development Environment
We use `gb` for managing our dependencies, so `gb build` and `gb test` is how
to build dendrite and run the unit tests respectively. There are [scripts](scripts)
for [linting](scripts/find-lint.sh) and doing a [build/test/lint run](scripts/build-test-lint.sh).
[How to Write Go Code](http://golang.org/doc/code.html)
Ligase use [`Go Modules`](https://github.com/golang/go/wiki/Modules) to manage dependencies
## Picking Things To Do
The version of GO should be **1.13** or above
If you're new then feel free to pick up an issue labelled [good first issue](https://github.com/matrix-org/dendrite/labels/good%20first%20issue).
These should be well-contained, small pieces of work that can be picked up to
help you get familiar with the code base.
# Style Guide
Once you're comfortable with hacking on Dendrite there are issues lablled as
[help wanted](https://github.com/matrix-org/dendrite/labels/help%20wanted), these
are often slightly larger or more complicated pieces of work but are hopefully
nonetheless fairly well-contained.
Working with our source code involves some famous rules:
We ask people who are familiar with Dendrite to leave the [good first issue](https://github.com/matrix-org/dendrite/labels/good%20first%20issue)
issues so that there is always a way for new people to come and get involved.
[Effective GO](https://golang.org/doc/effective_go.html)
## Getting Help
[Go Code Review Comments](https://github.com/golang/go/wiki/CodeReviewComments)
For questions related to developing on Dendrite we have a dedicated room on
Matrix [#dendrite-dev:matrix.org](https://riot.im/develop/#/room/#dendrite-dev:matrix.org)
where we're happy to help.
# Workflow
For more general questions please use [#dendrite:matrix.org](https://riot.im/develop/#/room/#dendrite:matrix.org).
## step 1: Fork in the cloud
## Sign off
1. Visit https://github.com/finogeeks/ligase
2. On the top right of the page, click the `Fork` button (top right) to create a cloud-based fork of the repository.
We ask that everyone who contributes to the project signs off their
contributions, in accordance with the [DCO](https://github.com/matrix-org/matrix-doc/blob/master/CONTRIBUTING.rst#sign-off).
## step 2: Clone fork to local storage
```sh
mkdir -p $working_dir
cd $working_dir
git clone https://github.com/$user/ligase.git
# or: git clone git@github.com:$user/ligase.git
cd $working_dir/ligase
git remote add upstream https://github.com/finogeeks/ligase.git
# or: git remote add upstream git@github.com:finogeeks/ligase.git
# Never push to the upstream master.
git remote set-url --push upstream no_push
# Confirm that your remotes make sense:
# It should look like:
# origin git@github.com:$(user)/ligase.git (fetch)
# origin git@github.com:$(user)/ligase.git (push)
# upstream https://github.com/finogeeks/ligase (fetch)
# upstream no_push (push)
git remote -v
```
## step 3: Branch
```sh
cd $working_dir/ligase
git fetch upstream
# Base your changes on the develop branch.
git checkout -b develop
git rebase upstream/develop
```
Branch from develop:
```sh
git checkout -b myfeature
```
## Step 4: Develop
### Edit the code
You can now edit the code on the `myfeature` branch.
### Build && Run Ligase
```sh
# start up the dependency services
docker-compose up -d
# build
./build.sh
# run the server
./run.sh
```
### Test
```sh
# build and run the unit test to make sure all tests are passed.
make test
# Check the checklist (gofmt -> golint)
make checklist
```
## Step 5: Keep your branch in sync
```sh
# While on your myfeature branch.
git fetch upstream
git rebase upstream/develop
```
Please don't use `git pull` instead of the above `fetch`/`rebase`. `git pull`
does a merge, which leaves merge commits. These make the commit history messy
and violate the principle that commits ought to be individually understandable
and useful (see below). You can also consider changing your `.git/config` file
via `git config branch.autoSetupRebase` always to change the behavior of `git pull`.
## Step 6: Commit
Commit your changes.
```sh
git commit
```
Likely you'll go back and edit/build/test further, and then `commit --amend` in a
few cycles.
## Step 7: Push
When the changes are ready to review (or you just to create an offsite backup
or your work), push your branch to your fork on `github.com`:
```sh
git push --set-upstream ${your_remote_name} myfeature
```
## Step 8: Create a pull request
1. Visit your fork at `https://github.com/$user/ligase`.
2. Click the `Compare & Pull Request` button next to your `myfeature` branch.
3. Fill in the required information in the PR template.
### Get a code review
If your pull request (PR) is opened, it will be assigned to one or more
reviewers. Those reviewers will do a thorough code review, looking at
correctness, bugs, opportunities for improvement, documentation and comments,
and style.
To address review comments, you should commit the changes to the same branch of
the PR on your fork
# Design
## Log Based Architecture
### Decomposition and Decoupling
A matrix homeserver can be built around append-only event logs built from the
messages, receipts, presence, typing notifications, device messages and other
events sent by users on the homeservers or by other homeservers.
The server would then decompose into two categories: writers that add new
entries to the logs and readers that read those entries.
The event logs then serve to decouple the two components, the writers and
readers need only agree on the format of the entries in the event log.
This format could be largely derived from the wire format of the events used
in the client and federation protocols:
C-S API +---------+ Event Log +---------+ C-S API
---------> | |+ (e.g. kafka) | |+ --------->
| Writers || =============> | Readers ||
---------> | || | || --------->
S-S API +---------+| +---------+| S-S API
+---------+ +---------+
However the way matrix handles state events in a room creates a few
complications for this model.
1) Writers require the room state at an event to check if it is allowed.
2) Readers require the room state at an event to determine the users and
servers that are allowed to see the event.
3) A client can query the current state of the room from a reader.
The writers and readers cannot extract the necessary information directly from
the event logs because it would take too long to extract the information as the
state is built up by collecting individual state events from the event history.
The writers and readers therefore need access to something that stores copies
of the event state in a form that can be efficiently queried. One possibility
would be for the readers and writers to maintain copies of the current state
in local databases. A second possibility would be to add a dedicated component
that maintained the state of the room and exposed an API that the readers and
writers could query to get the state. The second has the advantage that the
state is calculated and stored in a single location.
C-S API +---------+ Log +--------+ Log +---------+ C-S API
---------> | |+ ======> | | ======> | |+ --------->
| Writers || | Room | | Readers ||
---------> | || <------ | Server | ------> | || --------->
S-S API +---------+| Query | | Query +---------+| S-S API
+---------+ +--------+ +---------+
The room server can annotate the events it logs to the readers with room state
so that the readers can avoid querying the room server unnecessarily.
[This architecture can be extended to cover most of the APIs.](WIRING.md)
## How things are supposed to work.
### Local client sends an event in an existing room.
0) The client sends a PUT `/_matrix/client/r0/rooms/{roomId}/send` request
and an HTTP loadbalancer routes the request to a ClientAPI.
1) The ClientAPI:
* Authenticates the local user using the `access_token` sent in the HTTP
request.
* Checks if it has already processed or is processing a request with the
same `txnID`.
* Calculates which state events are needed to auth the request.
* Queries the necessary state events and the latest events in the room
from the RoomServer.
* Confirms that the room exists and checks whether the event is allowed by
the auth checks.
* Builds and signs the events.
* Writes the event to a "InputRoomEvent" kafka topic.
* Send a `200 OK` response to the client.
2) The RoomServer reads the event from "InputRoomEvent" kafka topic:
* Checks if it has already has a copy of the event.
* Checks if the event is allowed by the auth checks using the auth events
at the event.
* Calculates the room state at the event.
* Works out what the latest events in the room after processing this event
are.
* Calculate how the changes in the latest events affect the current state
of the room.
* TODO: Workout what events determine the visibility of this event to other
users
* Writes the event along with the changes in current state to an
"OutputRoomEvent" kafka topic. It writes all the events for a room to
the same kafka partition.
3a) The ClientSync reads the event from the "OutputRoomEvent" kafka topic:
* Updates its copy of the current state for the room.
* Works out which users need to be notified about the event.
* Wakes up any pending `/_matrix/client/r0/sync` requests for those users.
* Adds the event to the recent timeline events for the room.
3b) The FederationSender reads the event from the "OutputRoomEvent" kafka topic:
* Updates its copy of the current state for the room.
* Works out which remote servers need to be notified about the event.
* Sends a `/_matrix/federation/v1/send` request to those servers.
* Or if there is a request in progress then add the event to a queue to be
sent when the previous request finishes.
### Remote server sends an event in an existing room.
0) The remote server sends a `PUT /_matrix/federation/v1/send` request and an
HTTP loadbalancer routes the request to a FederationReceiver.
1) The FederationReceiver:
* Authenticates the remote server using the "X-Matrix" authorisation header.
* Checks if it has already processed or is processing a request with the
same `txnID`.
* Checks the signatures for the events.
Fetches the ed25519 keys for the event senders if necessary.
* Queries the RoomServer for a copy of the state of the room at each event.
* If the RoomServer doesn't know the state of the room at an event then
query the state of the room at the event from the remote server using
`GET /_matrix/federation/v1/state_ids` falling back to
`GET /_matrix/federation/v1/state` if necessary.
* Once the state at each event is known check whether the events are
allowed by the auth checks against the state at each event.
* For each event that is allowed write the event to the "InputRoomEvent"
kafka topic.
* Send a 200 OK response to the remote server listing which events were
successfully processed and which events failed
2) The RoomServer processes the event the same as it would a local event.
3a) The ClientSync processes the event the same as it would a local event.
FROM alpine
RUN mkdir -p /mnt/data/logs
RUN mkdir -p /opt/ligase/log
#RUN apk add --update-cache ca-certificates
RUN apk add librdkafka
ENV LOG_DIR=/mnt/data/logs
ENV SERVICE_NAME=monolith
ADD ./config /opt/ligase/config
ADD ./bin /opt/ligase/bin
ADD ./start.sh /opt/ligase/start.sh
#EXPOSE 8008 8448 7000
EXPOSE 8008 8448 7000 18008 18448
WORKDIR /opt/ligase
CMD ./start.sh
# Installing Dendrite
# Installing Ligase
Dendrite can be run in one of two configurations:
Ligase is designed to be a Cloud Native application. We recommend deploy Ligase via docker.
* A cluster of individual components, dealing with different aspects of the
Matrix protocol (see [WIRING.md](./WIRING.md)). Components communicate with
one another via [Apache Kafka](https://kafka.apache.org).
* A monolith server, in which all components run in the same process. In this
configuration, Kafka can be replaced with an in-process implementation
called [naffka](https://github.com/matrix-org/naffka).
This document shows how to start up Ligase on a single machine
## Requirements
- Go 1.8+
- Postgres 9.5+
- For Kafka (optional if using the monolith server):
- Unix-based system (https://kafka.apache.org/documentation/#os)
- JDK 1.8+ / OpenJDK 1.8+
- Apache Kafka 0.10.2+ (see [scripts/install-local-kafka.sh](scripts/install-local-kafka.sh) for up-to-date version numbers)
## Setting up a development environment
Assumes Go 1.8 and JDK 1.8 are already installed and are on PATH.
```bash
# Get the code
git clone https://github.com/matrix-org/dendrite
cd dendrite
# Build it
go get github.com/constabulary/gb/...
gb build
```
If using Kafka, install and start it (c.f. [scripts/install-local-kafka.sh](scripts/install-local-kafka.sh)):
```bash
MIRROR=http://apache.mirror.anlx.net/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
# Only download the kafka if it isn't already downloaded.
test -f kafka.tgz || wget $MIRROR -O kafka.tgz
# Unpack the kafka over the top of any existing installation
mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1
# Start the zookeeper running in the background.
# By default the zookeeper listens on localhost:2181
kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties
# Start the kafka server running in the background.
# By default the kafka listens on localhost:9092
kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties
```
## Configuration
### Postgres database setup
Dendrite requires a postgres database engine, version 9.5 or later.
* Create role:
```bash
sudo -u postgres createuser -P dendrite # prompts for password
```
* Create databases:
```bash
for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi; do
sudo -u postgres createdb -O dendrite dendrite_$i
done
```
### Crypto key generation
Generate the keys:
```bash
# Generate a self-signed SSL cert for federation:
test -f server.key || openssl req -x509 -newkey rsa:4096 -keyout server.key -out server.crt -days 3650 -nodes -subj /CN=localhost
# generate ed25519 signing key
test -f matrix_key.pem || ./bin/generate-keys -private-key matrix_key.pem
```
### Configuration
Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Things that will need editing include *at least*:
* `server_name`
* `database/*`
## Starting a monolith server
It is possible to use 'naffka' as an in-process replacement to Kafka when using
the monolith server. To do this, set `use_naffka: true` in `dendrite.yaml`.
* Go 1.13 or higher
* Postgres 9.5 or higher
* Apache Kafka 0.10.2+
The monolith server can be started as shown below. By default it listens for
HTTP connections on port 8008, so point your client at
`http://localhost:8008`. If you set `--tls-cert` and `--tls-key` as shown
below, it will also listen for HTTPS connections on port 8448.
## Setup denpendent services
recommended way:
```bash
./bin/dendrite-monolith-server --tls-cert=server.crt --tls-key=server.key
docker-compose up -d
```
## Starting a multiprocess server
## Build
The following contains scripts which will run all the required processes in order to point a Matrix client at Dendrite. Conceptually, you are wiring together to form the following diagram:
```
/media +---------------------------+
+----------->+------------->| dendrite-media-api-server |
^ ^ +---------------------------+
| | :7774
| |
| |
| | /directory +----------------------------------+
| | +--------->| dendrite-public-rooms-api-server |<========++
| | | +----------------------------------+ ||
| | | :7775 | ||
| | | +<-----------+ ||
| | | | ||
| | | /sync +--------------------------+ ||
| | +--------->| dendrite-sync-api-server |<================++
| | | | +--------------------------+ ||
| | | | :7773 | ^^ ||
Matrix +------------------+ | | | | || client_data ||
Clients --->| client-api-proxy |-------+ +<-----------+ ++=============++ ||
+------------------+ | | | || ||
:8008 | | CS API +----------------------------+ || ||
| +--------->| dendrite-client-api-server |==++ ||
| | +----------------------------+ ||
| | :7771 | ||
| | | ||
| +<-----------+ ||
| | ||
| | ||
| | +----------------------+ room_event ||
| +---------->| dendrite-room-server |===============++
| | +----------------------+ ||
| | :7770 ||
| | ++==========================++
| +<------------+ ||
| | | VV
| | +-----------------------------------+ Matrix
| | | dendrite-federation-sender-server |------------> Servers
| | +-----------------------------------+
| | :7776
| |
+---------->+ +<-----------+
| |
Matrix +----------------------+ SS API +--------------------------------+
Servers --->| federation-api-proxy |--------->| dendrite-federation-api-server |
+----------------------+ +--------------------------------+
:8448 :7772
A --> B = HTTP requests (A = client, B = server)
A ==> B = Kafka (A = producer, B = consumer)
```
### Run a client api proxy
This is what Matrix clients will talk to. If you use the script below, point your client at `http://localhost:8008`.
```bash
./bin/client-api-proxy \
--bind-address ":8008" \
--client-api-server-url "http://localhost:7771" \
--sync-api-server-url "http://localhost:7773" \
--media-api-server-url "http://localhost:7774" \
--public-rooms-api-server-url "http://localhost:7775" \
```
### Run a client api
This is what implements message sending. Clients talk to this via the proxy in order to send messages.
```bash
./bin/dendrite-client-api-server --config=dendrite.yaml
```
(If this fails with `pq: syntax error at or near "ON"`, check you are using at least postgres 9.5.)
### Run a room server
This is what implements the room DAG. Clients do not talk to this.
### Build for docker
```bash
./bin/dendrite-room-server --config=dendrite.yaml
./build.sh
docker build -t ligase .
```
### Run a sync server
This is what implements `/sync` requests. Clients talk to this via the proxy in order to receive messages.
### Build for local host
```bash
./bin/dendrite-sync-api-server --config dendrite.yaml
./build.sh
```
### Run a media server
This implements `/media` requests. Clients talk to this via the proxy in order to upload and retrieve media.
add those to **/etc/hosts** if you want to run ligase in your local host.
```bash
./bin/dendrite-media-api-server --config dendrite.yaml
```shell
127.0.0.1 pg-master
127.0.0.1 zookeeper
127.0.0.1 kafka
127.0.0.1 redis
127.0.0.1 nats
```
### Run public room server
## Configuration
This implements `/directory` requests. Clients talk to this via the proxy in order to retrieve room directory listings.
Replace ./config/config.yaml with your own configuration if you didn't use the recommended way to setup denpendent services.
```bash
./bin/dendrite-public-rooms-api-server --config dendrite.yaml
```
### Run a federation api proxy
## Run
This is what Matrix servers will talk to. This is only required if you want to support federation.
## Run in docker
```bash
./bin/federation-api-proxy \
--bind-address ":8448" \
--federation-api-url "http://localhost:7772" \
--media-api-server-url "http://localhost:7774" \
```sh
docker run --network ligase_default --expose 8008 --detach --name ligase ligase
```
### Run a federation api server
This implements federation requests. Servers talk to this via the proxy in
order to send transactions. This is only required if you want to support
federation.
### Run in local host
```bash
./bin/dendrite-federation-api-server --config dendrite.yaml
```
1. In order to run ligase in your local host, follow the steps in https://github.com/edenhill/librdkafka to install librdkafka first.
### Run a federation sender server
2. Then start ligase by:
This sends events from our users to other servers. This is only required if
you want to support federation.
```bash
./bin/dendrite-federation-sender-server --config dendrite.yaml
```
```sh
export SERVICE_NAME=monolith
./start.sh
```
\ No newline at end of file
此差异已折叠。
test:
@echo "test passed"
check:
fmt:
@echo "gofmt (simplify)"
go fmt ./...
# Dendrite [![Build Status](https://travis-ci.org/matrix-org/dendrite.svg?branch=master)](https://travis-ci.org/matrix-org/dendrite)
# Introduction
Dendrite will be a matrix homeserver written in go.
Ligase is a Golang-based implementation of Matrix home server, following the Matrix spec as defined at matrix.org. It has been used in production by an array of financial institutions in various scenarios, including but not limited to OTC-style trading (i.e in the bond trading market), collaborative workspace, stock brokerage, retail banking and more. It usually serves as the core to support more sophisticated financial applications running in secured, on-premise and regulation-compliant environments. But nothing limits it as a powerful Instant Messaging server as well as a general purpose, open, Messsging As A Platform (MaaP) solution.
It's still very much a work in progress, but installation instructions can
be found in [INSTALL.md](INSTALL.md)
# Alternatives to Synapse and Dendrite
An overview of the design can be found in [DESIGN.md](DESIGN.md)
Synapse and Dendrite are two reference implementation based upon the Matrix spec, implemented in Python and Golang, respectively. Here we provide yet another alternative, field-tested by a number of banks, stock brokerage houses and fintech companies since 2018. Financial applications require an IM with lower latency, better scalability, higher concurrency and stronger backend monitoring capabilities. We therefore derived a branch originally from Dendrite but since then parting our way from it to drill into a completely different implementation with a different architecture.
# Contributing
# Architecture and other technological considerations
While sticking to the Matrix spec as much as it could be, this implementation has adopted the following approaches:
* a Fan-in/Fan-out topology
* a CQRS (Command and Query Responsibility Segregation) pattern
* leveraging Kafka for event sourcing and stream message storage
* micro-service based with full leverage to containers (i.e. Docker) and container choreography platforms (i.e. Kubernetes/Rancher)
# A Cloud-native K8S application
Everyone is welcome to help out and contribute! See [CONTRIBUTING.md](CONTRIBUTING.md)
to get started!
The implementation will gradually take advantage of cloud facilities so as to reach the state of being cloud-native, inherently supporting DevOps. It deploys via docker-compose as a single node deployment. But it meets enterprise-grade requirements to run as a native K8S application, leveraging K8S' elasticity and resiliency. Integration with Prometheus gives IT operators a powerful monitoring tool.
# Installation
Read the [INSTALL](./INSTALL.md)
# Contributing
We aim to try and make it as easy as possible to jump in.
Read the [CONTRIBUTING](./CONTRIBUTING.md)
# Discussion
# Troubleshooting
For questions about Dendrite we have a dedicated room on Matrix
[#dendrite:matrix.org](https://riot.im/develop/#/room/#dendrite:matrix.org).
problems and known solutions
# Progress
# Support channels
There's plenty still to do to make Dendrite usable! We're tracking progress in
a [spreadsheet](https://docs.google.com/spreadsheets/d/1tkMNpIpPjvuDJWjPFbw_xzNzOHBA-Hp50Rkpcr43xTw).
Slack/Reddit/StackOverflow/FinChat
\ No newline at end of file
# Wiring
The diagram is incomplete. The following things aren't shown on the diagram:
- [ ] Device Messages
- [ ] User Profiles
- [ ] Notification Counts
- [ ] Sending federation.
- [ ] Querying federation.
- [ ] Other things that aren't shown on the diagram.
Diagram:
W -> Writer
S -> Server/Store/Service/Something/Stuff
R -> Reader
+---+ +---+ +---+
+----------| W | +----------| S | +--------| R |
| +---+ | Receipts +---+ | Client +---+
| Federation |>=========================================>| Server |>=====================>| Sync |
| Receiver | | | | |
| | +---+ | | | |
| | +--------| W | | | | |
| | | Client +---+ | | | |
| | | Receipt |>=====>| | | |
| | | Updater | | | | |
| | +----------+ | | | |
| | | | | |
| | +---+ +---+ | | +---+ | |
| | +------------| W | +------| S | | | +--------| R | | |
| | | Federation +---+ | Room +---+ | | | Client +---+ | |
| | | Backfill |>=====>| Server |>=====>| |>=====>| Push | | |
| | +--------------+ | | +------------+ | | | |
| | | | | | | |
| | | |>==========================>| | | |
| | | | +----------+ | |
| | | | | |
| | | | +---+ | |
| | | | +--------| R | | |
| | | | | Client +---+ | |
| |>========================>| |>==========================>| Search | | |
| | | | | | | |
| | | | +----------+ | |
| | | | | |
| | | |>==========================================>| |
| | | | | |
| | +---+ | | +---+ | |
| | +--------| W | | | +----------| S | | |
| | | Client +---+ | | | Presence +---+ | |
| | | API |>=====>| |>=====>| Server |>=====================>| |
| | | /send | +--------+ | | | |
| | | | | | | |
| | | |>======================>| |<=====================<| |
| | +----------+ | | | |
| | | | | |
| | +---+ | | | |
| | +--------| W | | | | |
| | | Client +---+ | | | |
| | | Presence |>=====>| | | |
| | | Setter | | | | |
| | +----------+ | | | |
| | | | | |
| | | | | |
| |>=========================================>| | | |
| | +------------+ | |
| | | |
| | +---+ | |
| | +----------| S | | |
| | | Typing +---+ | |
| |>=========================================>| Server |>=====================>| |
+------------+ | | +----------+
+---+ | |
+--------| W | | |
| Client +---+ | |
| Typing |>=====>| |
| Setter | | |
+----------+ +------------+
# Component Descriptions
Many of the components are logical rather than physical. For example it is
possible that all of the client API writers will end up being glued together
and always deployed as a single unit.
Outbound federation requests will probably need to be funnelled through a
choke-point to implement ratelimiting and backoff correctly.
## Federation Send
* Handles `/federation/v1/send/` requests.
* Fetches missing ``prev_events`` from the remote server if needed.
* Fetches missing room state from the remote server if needed.
* Checks signatures on remote events, downloading keys if needed.
* Queries information needed to process events from the Room Server.
* Writes room events to logs.
* Writes presence updates to logs.
* Writes receipt updates to logs.
* Writes typing updates to logs.
* Writes other updates to logs.
## Client API /send
* Handles puts to `/client/v1/rooms/` that create room events.
* Queries information needed to process events from the Room Server.
* Talks to remote servers if needed for joins and invites.
* Writes room event pdus.
* Writes presence updates to logs.
## Client Presence Setter
* Handles puts to whatever the client API path for presence is?
* Writes presence updates to logs.
## Client Typing Setter
* Handles puts to whatever the client API path for typing is?
* Writes typing updates to logs.
## Client Receipt Updater
* Handles puts to whatever the client API path for receipts is?
* Writes typing updates to logs.
## Federation Backfill
* Backfills events from other servers
* Writes the resulting room events to logs.
* Is a different component from the room server itself cause it'll
be easier if the room server component isn't making outbound HTTP requests
to remote servers
## Room Server
* Reads new and backfilled room events from the logs written by FS, FB and CRS.
* Tracks the current state of the room and the state at each event.
* Probably does auth checks on the incoming events.
* Handles state resolution as part of working out the current state and the
* state at each event.
* Writes updates to the current state and new events to logs.
* Shards by room ID.
## Receipt Server
* Reads new updates to receipts from the logs written by the FS and CRU.
* Somehow learns enough information from the room server to workout how the
current receipt markers move with each update.
* Writes the new marker positions to logs
* Shards by room ID?
* It may be impossible to implement without folding it into the Room Server
forever coupling the components together.
## Typing Server
* Reads new updates to typing from the logs written by the FS and CTS.
* Updates the current list of people typing in a room.
* Writes the current list of people typing in a room to the logs.
* Shards by room ID?
## Presence Server
* Reads the current state of the rooms from the logs to track the intersection
of room membership between users.
* Reads updates to presence from the logs written by the FS and the CPS.
* Reads when clients sync from the logs from the Client Sync.
* Tracks any timers for users.
* Writes the changes to presence state to the logs.
* Shards by user ID somehow?
## Client Sync
* Handle /client/v2/sync requests.
* Reads new events and the current state of the rooms from logs written by the Room Server.
* Reads new receipts positions from the logs written by the Receipts Server.
* Reads changes to presence from the logs written by the Presence Server.
* Reads changes to typing from the logs written by the Typing Server.
* Writes when a client starts and stops syncing to the logs.
## Client Search
* Handle whatever the client API path for event search is?
* Reads new events and the current state of the rooms from logs writeen by the Room Server.
* Maintains a full text search index of somekind.
## Client Push
* Pushes unread messages to remote push servers.
* Reads new events and the current state of the rooms from logs writeen by the Room Server.
* Reads the position of the read marker from the Receipts Server.
* Makes outbound HTTP hits to the push server for the client device.
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package adapter
import (
"math/rand"
"reflect"
"time"
)
const (
DEBUG_LEVEL_DEBUG = "debug"
DEBUG_LEVEL_PROD = "prod"
)
type KafkaCommonCfg struct {
EnableIdempotence bool
//only avoid kafka sync send has problem can force to async send
ForceAsyncSend bool
ReplicaFactor int
NumPartitions int
NumProducers int
}
type DomainCfg struct {
Domains []string
}
type LockCfg struct {
Timeout int
Wait int
Force bool
}
type DistLockCfg struct {
LockInstance LockCfg
LockRoomState LockCfg
LockRoomStateExt LockCfg
}
type DebugCfg struct {
DebugLevel string
}
type CommonCfg struct {
Kafka KafkaCommonCfg
Domain DomainCfg
DistLock DistLockCfg
Debug DebugCfg
}
var AdapterCfg CommonCfg
func SetKafkaEnableIdempotence(enableIdempotence bool) {
AdapterCfg.Kafka.EnableIdempotence = enableIdempotence
}
func GetKafkaEnableIdempotence() bool {
return AdapterCfg.Kafka.EnableIdempotence
}
func SetKafkaForceAsyncSend(forceAsyncSend bool) {
AdapterCfg.Kafka.ForceAsyncSend = forceAsyncSend
}
func GetKafkaForceAsyncSend() bool {
return AdapterCfg.Kafka.ForceAsyncSend
}
func SetKafkaReplicaFactor(replicaFactor int) {
AdapterCfg.Kafka.ReplicaFactor = replicaFactor
}
func GetKafkaReplicaFactor() int {
return AdapterCfg.Kafka.ReplicaFactor
}
func SetKafkaNumPartitions(numPartitions int) {
AdapterCfg.Kafka.NumPartitions = numPartitions
}
func GetKafkaNumPartitions() int {
return AdapterCfg.Kafka.NumPartitions
}
func SetKafkaNumProducers(numProducers int) {
AdapterCfg.Kafka.NumProducers = numProducers
}
func GetKafkaNumProducers() int {
return AdapterCfg.Kafka.NumProducers
}
func SetDomainCfg(domains []string) {
AdapterCfg.Domain.Domains = domains
}
func GetDomainCfg() []string {
return AdapterCfg.Domain.Domains
}
func Contains(obj interface{}, target interface{}) bool {
targetValue := reflect.ValueOf(target)
switch reflect.TypeOf(target).Kind() {
case reflect.Slice, reflect.Array:
for i := 0; i < targetValue.Len(); i++ {
if targetValue.Index(i).Interface() == obj {
return true
}
}
case reflect.Map:
if targetValue.MapIndex(reflect.ValueOf(obj)).IsValid() {
return true
}
}
return false
}
func CheckSameDomain(domains []string) bool {
checkDomains := GetDomainCfg()
for _, domain := range domains {
if !Contains(domain, checkDomains) {
return false
}
}
return true
}
func GetDistLockCfg() DistLockCfg {
return AdapterCfg.DistLock
}
func SetDistLockItemCfg(item string, timeout, wait int, force bool) {
lockCfg := LockCfg{
Timeout: timeout,
Wait: wait,
Force: force,
}
switch item {
case "instance":
AdapterCfg.DistLock.LockInstance = lockCfg
case "room_state":
AdapterCfg.DistLock.LockRoomState = lockCfg
case "room_state_ext":
AdapterCfg.DistLock.LockRoomStateExt = lockCfg
}
}
func SetDebugLevel(debugLevel string) {
AdapterCfg.Debug.DebugLevel = debugLevel
}
func GetDebugLevel() string {
return AdapterCfg.Debug.DebugLevel
}
func Random(min, max int) int {
rand.Seed(time.Now().UnixNano())
return rand.Intn(max-min) + min
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package appservice
import (
"github.com/finogeeks/ligase/appservice/consumers"
"github.com/finogeeks/ligase/appservice/types"
"github.com/finogeeks/ligase/appservice/workers"
"github.com/finogeeks/ligase/common/basecomponent"
"github.com/finogeeks/ligase/skunkworks/log"
"sync"
)
// SetupApplicationServiceComponent sets up
// 因为roomserver 的api实现不全 要查房间别名的时候需要databse
// 正常应该使用api访问来实现
func SetupApplicationServiceComponent(base *basecomponent.BaseDendrite) {
applicationServiceDB := base.CreateApplicationServiceDB()
roomserverDB := base.CreateRoomDB()
// Wrap application services in a type that relates the application service and
// a sync.Cond object that can be used to notify workers when there are new
// events to be sent out.
// 每一个appservice 对应 一个 worker
workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
for i, appservice := range base.Cfg.Derived.ApplicationServices {
m := sync.Mutex{}
ws := types.ApplicationServiceWorkerState{
AppService: appservice,
Cond: sync.NewCond(&m),
}
workerStates[i] = ws
}
consumer := consumers.NewOutputRoomEventConsumer(base.Cfg, applicationServiceDB, roomserverDB,
workerStates)
if err := consumer.Start(); err != nil {
log.Panicw("failed to start room server consumer", log.KeysAndValues{"error", err})
}
// Create application service transaction workers
if err := workers.SetupTransactionWorkers(applicationServiceDB, workerStates); err != nil {
log.Panicw("failed to start app service transaction workers", log.KeysAndValues{"error", err})
}
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package consumers
import (
"context"
"github.com/finogeeks/ligase/appservice/types"
"github.com/finogeeks/ligase/common"
"github.com/finogeeks/ligase/common/config"
"github.com/finogeeks/ligase/core"
"github.com/finogeeks/ligase/model/service/roomserverapi"
"github.com/finogeeks/ligase/skunkworks/gomatrixserverlib"
"github.com/finogeeks/ligase/storage/model"
jsoniter "github.com/json-iterator/go"
log "github.com/finogeeks/ligase/skunkworks/log"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
channel core.IChannel
asDB model.AppServiceDatabase
rsDB model.RoomServerDatabase
workerStates []types.ApplicationServiceWorkerState
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
appserviceDB model.AppServiceDatabase,
rsDB model.RoomServerDatabase,
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
val, ok := common.GetTransportMultiplexer().GetChannel(
cfg.Kafka.Consumer.OutputRoomEventAppservice.Underlying,
cfg.Kafka.Consumer.OutputRoomEventAppservice.Name,
)
if ok {
channel := val.(core.IChannel)
c := &OutputRoomEventConsumer{
channel: channel,
asDB: appserviceDB,
rsDB: rsDB,
workerStates: workerStates,
}
channel.SetHandler(c)
return c
}
return nil
}
// Start consuming from room servers
func (c *OutputRoomEventConsumer) Start() error {
//c.channel.Start()
return nil
}
// onMessage is called when the sync server receives a new event from the room server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (c *OutputRoomEventConsumer) OnMessage(ctx context.Context, topic string, partition int32, data []byte, rawMsg interface{}) {
// Parse out the event JSON
var output roomserverapi.OutputEvent
if err := json.Unmarshal(data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.Errorw("applicationservice: message parse failure", log.KeysAndValues{"error", err})
return
}
if output.Type != roomserverapi.OutputTypeNewRoomEvent {
log.Debugw("applicationservice: ignoring unknown output type", log.KeysAndValues{"topic", topic, "type", output.Type})
return
}
ev := &output.NewRoomEvent.Event
log.Infow("applicationservice received event from roomserver", log.KeysAndValues{"event_id", ev.EventID, "room_id", ev.RoomID, "type", ev.Type})
// todo 从字段获取missevents
missingEvents := []gomatrixserverlib.ClientEvent{}
events := append(missingEvents, *ev)
// Send event to any relevant application services
c.filterRoomserverEvents(ctx, events)
//c.AsHandler.NotifyInterestedAppServices(context.Background(), ev)
}
// filterRoomserverEvents takes in events and decides whether any of them need
// to be passed on to an external application service. It does this by checking
// each namespace of each registered application service, and if there is a
// match, adds the event to the queue for events to be sent to a particular
// application service.
func (s *OutputRoomEventConsumer) filterRoomserverEvents(
ctx context.Context,
events []gomatrixserverlib.ClientEvent,
) error {
for _, ws := range s.workerStates {
for _, event := range events {
// Check if this event is interesting to this application service
if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
// Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil {
log.Warnw("failed to insert incoming event into appservices database", log.KeysAndValues{"error", err})
} else {
// Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast
ws.NotifyNewEvents()
}
}
}
}
return nil
}
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
// event falls within one of a given application service's namespaces.
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.ClientEvent, appservice config.ApplicationService) bool {
// No reason to queue events if they'll never be sent to the application
// service
if appservice.URL == "" {
return false
}
// Check Room ID and Sender of the event
if appservice.IsInterestedInUserID(event.Sender) ||
appservice.IsInterestedInRoomID(event.RoomID) {
log.Infow("interest in this event", log.KeysAndValues{"room_id", event.RoomID})
return true
}
if appservice.InterestedAll {
return true
}
// 注意 这里去掉了根据别名过滤的功能,这样就不会去查询roomserver的db了 但是同时根据别名过滤的功能也失效了
// 暂时没用用到,以后应该让roomserver使用api支持这个功能
//aliasList, err := s.rsDB.GetAliasesFromRoomID(context.Background(), event.RoomID)
//// Check all known room aliases of the room the event came from
//if err == nil {
// for _, alias := range aliasList {
// if appservice.IsInterestedInRoomAlias(alias) {
// return true
// }
// }
//} else {
// log.WithFields(log.Fields{
// "room_id": event.RoomID(),
// }).WithError(err).Errorf("Unable to get aliases for room")
//}
return false
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package types
import (
"github.com/finogeeks/ligase/common/config"
"sync"
)
// ApplicationServiceWorkerState is a type that couples an application service,
// a lockable condition as well as some other state variables, allowing the
// roomserver to notify appservice workers when there are events ready to send
// externally to application services.
type ApplicationServiceWorkerState struct {
AppService config.ApplicationService
Cond *sync.Cond
// Events ready to be sent
EventsReady bool
// Backoff exponent (2^x secs). Max 6, aka 64s.
Backoff int
}
// NotifyNewEvents wakes up all waiting goroutines, notifying that events remain
// in the event queue for this application service worker.
func (a *ApplicationServiceWorkerState) NotifyNewEvents() {
a.Cond.L.Lock()
a.EventsReady = true
a.Cond.Broadcast()
a.Cond.L.Unlock()
}
// FinishEventProcessing marks all events of this worker as being sent to the
// application service.
func (a *ApplicationServiceWorkerState) FinishEventProcessing() {
a.Cond.L.Lock()
a.EventsReady = false
a.Cond.L.Unlock()
}
// WaitForNewEvents causes the calling goroutine to wait on the worker state's
// condition for a broadcast or similar wakeup, if there are no events ready.
func (a *ApplicationServiceWorkerState) WaitForNewEvents() {
a.Cond.L.Lock()
if !a.EventsReady {
a.Cond.Wait()
}
a.Cond.L.Unlock()
}
// Copyright 2018 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// Modifications copyright (C) 2020 Finogeeks Co., Ltd
package workers
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/finogeeks/ligase/common"
"math"
"net/http"
"time"
"github.com/finogeeks/ligase/appservice/types"
"github.com/finogeeks/ligase/common/config"
"github.com/finogeeks/ligase/skunkworks/gomatrixserverlib"
log "github.com/finogeeks/ligase/skunkworks/log"
"github.com/finogeeks/ligase/storage/model"
)
var (
// Maximum size of events sent in each transaction.
transactionBatchSize = 50
// Timeout for sending a single transaction to an application service.
transactionTimeout = time.Second * 60
)
// SetupTransactionWorkers spawns a separate goroutine for each application
// service. Each of these "workers" handle taking all events intended for their
// app service, batch them up into a single transaction (up to a max transaction
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
// handles exponentially backing off in case the AS isn't currently available.
func SetupTransactionWorkers(
appserviceDB model.AppServiceDatabase,
workerStates []types.ApplicationServiceWorkerState,
) error {
// Create a worker that handles transmitting events to a single homeserver
for _, workerState := range workerStates {
log.Infof("start workerState for %s", workerState.AppService.URL)
// Don't create a worker if this AS doesn't want to receive events
if workerState.AppService.URL != "" {
go worker(appserviceDB, workerState)
}
}
return nil
}
// worker is a goroutine that sends any queued events to the application service
// it is given.
func worker(db model.AppServiceDatabase, ws types.ApplicationServiceWorkerState) {
log.Infow("starting application service", log.KeysAndValues{"appservice", ws.AppService.ID})
span, ctx := common.StartSobSomSpan(context.Background(), "transaction_scheduler.worker")
defer span.Finish()
// Create a HTTP client for sending requests to app services
client := &http.Client{
Timeout: transactionTimeout,
}
// Initial check for any leftover events to send from last time
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
if err != nil {
log.Fatalw("appservice worker unable to read queued events from DB", log.KeysAndValues{"appservice", ws.AppService.ID, "error", err})
return
}
if eventCount > 0 {
ws.NotifyNewEvents()
}
// Loop forever and keep waiting for more events to send
for {
// Wait for more events if we've sent all the events in the database
ws.WaitForNewEvents()
// Batch events up into a transaction
transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID)
if err != nil {
log.Fatalw("appservice worker unable to create transaction", log.KeysAndValues{"appservice", ws.AppService.ID, "error", err})
return
}
// Send the events off to the application service
// Backoff if the application service does not respond
err = send(client, ws.AppService, txnID, transactionJSON)
if err != nil {
// Backoff
backoff(&ws, err)
continue
}
// We sent successfully, hooray!
ws.Backoff = 0
// Transactions have a maximum event size, so there may still be some events
// left over to send. Keep sending until none are left
if !eventsRemaining {
ws.FinishEventProcessing()
}
// Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
if err != nil {
log.Fatalw("unable to remove appservice events from the database", log.KeysAndValues{"appservice", ws.AppService.ID, "error", err})
return
}
}
}
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
func backoff(ws *types.ApplicationServiceWorkerState, err error) {
// Calculate how long to backoff for
backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
backoffSeconds := time.Second * backoffDuration
log.Warnw(fmt.Sprintf("unable to send transactions successfully, backing off for %ds", backoffDuration), log.KeysAndValues{
"appservice", ws.AppService.ID, "error", err,
})
ws.Backoff++
if ws.Backoff > 6 {
ws.Backoff = 6
}
// Backoff
time.Sleep(backoffSeconds)
}
// createTransaction takes in a slice of AS events, stores them in an AS
// transaction, and JSON-encodes the results.
func createTransaction(
ctx context.Context,
db model.AppServiceDatabase,
appserviceID string,
) (
transactionJSON []byte,
txnID, maxID int,
eventsRemaining bool,
err error,
) {
// Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
if err != nil {
log.Fatalw("appservice worker unable to read queued events from DB", log.KeysAndValues{"appservice", appserviceID, "error", err})
return
}
// Check if these events do not already have a transaction ID
if txnID == -1 {
// If not, grab next available ID from the DB
txnID, err = db.GetLatestTxnID(ctx)
if err != nil {
log.Fatalw("get latest txnid error", log.KeysAndValues{"appservice", appserviceID, "error", err})
fmt.Println("get latest txnid error")
return nil, 0, 0, false, err
}
// Mark new events with current transactionID
if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil {
log.Fatalw("update txnid error", log.KeysAndValues{"appservice", appserviceID, "error", err})
fmt.Println("update txnid error")
return nil, 0, 0, false, err
}
}
// Create a transaction and store the events inside
transaction := gomatrixserverlib.ApplicationServiceTransaction{
Events: events,
}
transactionJSON, err = json.Marshal(transaction)
if err != nil {
return
}
return
}
// send sends events to an application service. Returns an error if an OK was not
// received back from the application service or the request timed out.
func send(
client *http.Client,
appservice config.ApplicationService,
txnID int,
transaction []byte,
) error {
if txnID == 0 {
return nil
}
// POST a transaction to our AS
address := fmt.Sprintf("%s/transactions/%d?access_token=%s", appservice.URL, txnID, appservice.HSToken)
req, err := http.NewRequest("PUT", address, bytes.NewBuffer(transaction))
req.Header.Add("Content-Type", "application/json")
resp, err := client.Do(req)
log.Infow("send requset "+address, log.KeysAndValues{"appservice", appservice.ID})
if err != nil {
return err
}
defer func() {
err := resp.Body.Close()
if err != nil {
log.Errorw("unable to close response body from application service", log.KeysAndValues{"appservice", appservice.ID, "error", err})
}
}()
//Check the AS received the events correctly
if resp.StatusCode != http.StatusOK {
// TODO: Handle non-200 error codes from application services
return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode)
}
return nil
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package bgmgr
import (
"github.com/finogeeks/ligase/bgmgr/devicemgr"
"github.com/finogeeks/ligase/bgmgr/txnmgr"
"github.com/finogeeks/ligase/common"
"github.com/finogeeks/ligase/common/filter"
"github.com/finogeeks/ligase/model/service"
"github.com/finogeeks/ligase/skunkworks/log"
"github.com/finogeeks/ligase/storage/model"
)
func SetupBgMgrComponent(
deviceDB model.DeviceDatabase,
cache service.Cache,
encryptDB model.EncryptorAPIDatabase,
syncDB model.SyncAPIDatabase,
rpcCli *common.RpcClient,
tokenFilter *filter.Filter,
scanUnActive int64,
kickUnActive int64,
) {
deviceMgr := devicemgr.NewDeviceMgr(deviceDB, cache, encryptDB, syncDB, rpcCli, tokenFilter, scanUnActive, kickUnActive)
log.Infof("scantime:%d,kicktime:%d", scanUnActive, kickUnActive)
deviceMgr.Start()
txnMgr := txnmgr.NewTxnMgr(cache)
txnMgr.Start()
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package devicemgr
import (
"context"
"github.com/finogeeks/ligase/clientapi/routing"
"github.com/finogeeks/ligase/common"
"github.com/finogeeks/ligase/common/filter"
"github.com/finogeeks/ligase/model/service"
"github.com/finogeeks/ligase/skunkworks/log"
"github.com/finogeeks/ligase/storage/model"
"time"
)
type DeviceMgr struct {
deviceDB model.DeviceDatabase
cache service.Cache
encryptDB model.EncryptorAPIDatabase
syncDB model.SyncAPIDatabase
rpcClient *common.RpcClient
tokenFilter *filter.Filter
scanUnActive int64
kickUnActive int64
}
func NewDeviceMgr(
deviceDB model.DeviceDatabase,
cache service.Cache,
encryptDB model.EncryptorAPIDatabase,
syncDB model.SyncAPIDatabase,
rpcClient *common.RpcClient,
tokenFilter *filter.Filter,
scanUnActive int64,
kickUnActive int64,
) *DeviceMgr {
dm := new(DeviceMgr)
dm.deviceDB = deviceDB
dm.cache = cache
dm.encryptDB = encryptDB
dm.syncDB = syncDB
dm.rpcClient = rpcClient
dm.tokenFilter = tokenFilter
dm.scanUnActive = scanUnActive
dm.kickUnActive = kickUnActive
return dm
}
func (dm *DeviceMgr) Start() {
go func() {
t := time.NewTicker(time.Millisecond * time.Duration(dm.scanUnActive))
for {
select {
case <-t.C:
func() {
span, ctx := common.StartSobSomSpan(context.Background(), "DeviceMgr.Start")
defer span.Finish()
dm.scanUnActionDevice(ctx)
}()
}
}
}()
}
func (dm *DeviceMgr) scanUnActionDevice(ctx context.Context) {
offset := 0
finish := false
limit := 500
//s
kickUnActive, _ := dm.cache.GetSetting("im.setting.autoLogoutTime")
//ms
kickUnActive = kickUnActive * 1000
if kickUnActive <= 0 {
kickUnActive = dm.kickUnActive
}
lastActiveTs := time.Now().UnixNano()/1000000 - kickUnActive
for {
if finish {
return
}
log.Infof("load unactive device timestamp:%d,limit:%d @offset:%d", lastActiveTs, limit, offset)
devids, dids, uids, total, err := dm.deviceDB.SelectUnActiveDevice(ctx, lastActiveTs, limit, offset)
if err != nil {
log.Errorf("load unactive device with err %v", err)
return
}
for idx := range dids {
log.Infof("kick out userId:%s,deviceId:%s", uids[idx], devids[idx])
go routing.LogoutDevice(ctx, uids[idx], devids[idx], dm.deviceDB, dm.cache, dm.encryptDB, dm.syncDB, dm.tokenFilter, dm.rpcClient)
}
if total < limit {
finish = true
} else {
offset = offset + limit
}
}
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package txnmgr
import (
"github.com/finogeeks/ligase/model/service"
"github.com/finogeeks/ligase/skunkworks/log"
"strconv"
"strings"
"time"
)
type TxnMgr struct {
cache service.Cache
ScanInterval int
}
func NewTxnMgr(
cache service.Cache,
) *TxnMgr {
return &TxnMgr{
cache: cache,
ScanInterval: 30000,
}
}
func (tm *TxnMgr) Start() {
go func() {
t := time.NewTicker(time.Millisecond * time.Duration(tm.ScanInterval))
for {
select {
case <-t.C:
tm.cleanOldTxnID()
}
}
}()
}
func (tm *TxnMgr) cleanOldTxnID() {
var cursor uint64 = 0
var count = 1000
for {
var result []string
var err error
result, cursor, err = tm.cache.ScanTxnID(cursor, count)
if err != nil {
log.Errorf("Scan msgid cursor:%d err:%v", cursor, err)
return
}
for _, key := range result {
tm.cleanExpireTxnID(key)
}
if cursor == 0 {
break
}
}
}
func (tm *TxnMgr) cleanExpireTxnID(key string) {
var cursor uint64 = 0
var count = 1000
batch := []string{}
for {
var result map[string]interface{}
var err error
result, cursor, err = tm.cache.HScan(key, "*", cursor, count)
if err != nil {
log.Errorf("HScan msgid cursor:%d err:%v", cursor, err)
return
}
for k, v := range result {
s := strings.Split(string(v.([]byte)), ":")
if len(s) <= 1 {
batch = append(batch, k)
} else {
ts, err := strconv.ParseInt(s[0], 10, 64)
if err != nil || time.Now().Unix()-ts > 30 {
batch = append(batch, k)
}
}
}
if cursor == 0 {
break
}
}
if len(batch) > 0 {
multi := make([]interface{}, len(batch))
for i, d := range batch {
multi[i] = d
}
tm.cache.HDelMulti(key, multi)
}
}
#!/bin/sh
PROJDIR=`cd $(dirname $0); pwd -P`
cd $PROJDIR
echo `pwd`
export GOPROXY=https://goproxy.io
cd $PROJDIR/cmd/engine-server
go build -v -o $PROJDIR/bin/engine-server
cd $PROJDIR/federation
go build -v -o $PROJDIR/bin/federation
cd $PROJDIR/federation
go build -v -o $PROJDIR/bin/content
cd $PROJDIR
go mod tidy
\ No newline at end of file
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"encoding/json"
"github.com/gomodule/redigo/redis"
)
//util
func (rc *RedisCache) encode(val interface{}) (interface{}, error) {
var value interface{}
switch v := val.(type) {
case string, int, uint, int8, int16, int32, int64, float32, float64, bool:
value = v
default:
b, err := json.Marshal(v)
if err != nil {
return nil, err
}
value = string(b)
}
return value, nil
}
//common
func (rc *RedisCache) Del(key string) error {
conn := rc.pool().Get()
defer conn.Close()
err := conn.Send("DEL", key)
if err != nil {
return err
}
return conn.Flush()
}
func (rc *RedisCache) Scan(cursor uint64, match string, count int) (result []interface{}, next uint64, err error) {
values, err := redis.Values(rc.SafeDo("scan", cursor, "match", match, "count", count))
if err != nil {
return []interface{}{}, cursor, err
}
values, err = redis.Scan(values, &cursor, &result)
if err != nil {
return []interface{}{}, cursor, err
} else {
return result, cursor, nil
}
}
func (rc *RedisCache) TTL(key string) (ttl int64, err error) {
return Int64(rc.SafeDo("TTL", key))
}
//string
func (rc *RedisCache) Get(key string) (interface{}, error) {
return rc.SafeDo("GET", key)
}
func (rc *RedisCache) GetString(key string) (string, error) {
r, err := rc.Get(key)
return String(r, err)
}
func (rc *RedisCache) Exists(key string) (bool, error) {
return Bool(rc.SafeDo("EXISTS", key))
}
func (rc *RedisCache) Set(key string, val interface{}, expire int64) error {
conn := rc.pool().Get()
defer conn.Close()
value, err := rc.encode(val)
if err != nil {
return err
}
if expire > 0 {
err = conn.Send("SETEX", key, expire, value)
} else {
err = conn.Send("SET", key, value)
}
if err != nil {
return err
}
return conn.Flush()
}
//hset
func (rc *RedisCache) HGet(key, field string) (interface{}, error) {
return rc.SafeDo("HGET", key, field)
}
func (rc *RedisCache) HGetString(key, field string) (string, error) {
return String(rc.HGet(key, field))
}
func (rc *RedisCache) HSet(key, field string, val interface{}) error {
conn := rc.pool().Get()
defer conn.Close()
value, err := rc.encode(val)
if err != nil {
return err
}
err = conn.Send("HSET", key, field, value)
if err != nil {
return err
}
return conn.Flush()
}
func (rc *RedisCache) HDel(key, field string) error {
conn := rc.pool().Get()
defer conn.Close()
err := conn.Send("HDEL", key, field)
if err != nil {
return err
}
return conn.Flush()
}
func (rc *RedisCache) HScan(key, match string, cursor uint64, count int) (result map[string]interface{}, next uint64, err error) {
values, err := redis.Values(rc.SafeDo("hscan", key, cursor, "match", match, "count", count))
if err != nil {
return nil, cursor, err
}
var res []interface{}
values, err = redis.Scan(values, &cursor, &res)
if err != nil {
return nil, cursor, err
} else {
result = make(map[string]interface{})
length := len(res) / 2
for i := 0; i < length; i++ {
key := res[2*i]
val := res[2*i+1]
result[string(key.([]byte))] = val
}
return result, cursor, nil
}
}
func (rc *RedisCache) HGetAll(key string) (map[string]interface{}, error) {
result, err := redis.Values(rc.SafeDo("HGETALL", key))
if err != nil {
return nil, err
} else {
if len(result) <= 0 {
return nil, nil
}
res := make(map[string]interface{})
length := len(result) / 2
for i := 0; i < length; i++ {
key := result[2*i]
val := result[2*i+1]
res[string(key.([]byte))] = val
}
return res, nil
}
}
func (rc *RedisCache) HDelMulti(key string, fields []interface{}) error {
conn := rc.pool().Get()
defer conn.Close()
for _, field := range fields {
conn.Send("HDEL", key, field)
}
return conn.Flush()
}
func (rc *RedisCache) HMSet(key string, val interface{}) (err error) {
conn := rc.pool().Get()
defer conn.Close()
err = conn.Send("HMSET", redis.Args{}.Add(key).AddFlat(val)...)
if err != nil {
return
}
return conn.Flush()
}
func (rc *RedisCache) HMGet(key string, fields []string) ([]interface{}, error) {
return redis.Values(rc.SafeDo("HMGET", redis.Args{}.Add(key).AddFlat(fields)...))
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"github.com/gomodule/redigo/redis"
)
func Int64(reply interface{}, err error) (int64, error) {
return redis.Int64(reply, err)
}
func String(reply interface{}, err error) (string, error) {
return redis.String(reply, err)
}
func Bool(reply interface{}, err error) (bool, error) {
return redis.Bool(reply, err)
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"github.com/finogeeks/ligase/model/service"
log "github.com/finogeeks/ligase/skunkworks/log"
"sync"
"sync/atomic"
"time"
)
type LocalCacheRepo struct {
repos []*sync.Map
nameMap sync.Map
offset int
regMtx sync.Mutex
cap int
cnt int32
duration int32
cleanIdx int32
ticker *time.Timer
cleanBukSize int32
cleanStep int32
cleanCap int32
cleanItems []*cleanItem
liveCnt int
}
type cleanItem struct {
mux sync.Mutex
cleanBuk []*sync.Map
}
func (ci *cleanItem) init(size int32) {
ci.cleanBuk = make([]*sync.Map, size)
for i := int32(0); i < size; i++ {
ci.cleanBuk[i] = new(sync.Map)
}
}
//cap: clean total slot cnt = slot_cnt*step
//step: slot width
//cur: clean slot
//old: last slot
//now: now slot
func (ci *cleanItem) update(lc *LocalCacheRepo, item *service.CacheItem, cap, step, cur, target int32) {
tgtSlot := (target % cap) / step
old := atomic.LoadInt32(&(item.Offset))
oldSlot := (old % cap) / step
curSlot := (cur % cap) / step
key := item.Key
repo := lc.repos[item.Repo]
if _, ok := repo.Load(key); !ok {
repo.Store(key, item)
log.Errorf("update item may missing key:%s repo:%s", key, lc.GetRegisterReverse(item.Repo))
}
if (oldSlot == tgtSlot) && (old != -1) {
if atomic.CompareAndSwapInt32(&(item.Offset), old, target) == false {
log.Errorf("update swap key:%s old:%d target:%d actual: %d repo:%s", key, old, target, atomic.LoadInt32(&(item.Offset)), lc.GetRegisterReverse(item.Repo))
}
return
}
item.Mux.Lock()
defer item.Mux.Unlock()
old = atomic.LoadInt32(&(item.Offset))
oldSlot = (old % cap) / step
ci.cleanBuk[tgtSlot].Store(key, true)
//log.Errorf("update set clean store key:%s slot:%d cur:%d repo:%s\, key, tgtSlot, curSlot, lc.GetRegisterReverse(item.repo))
if old == -1 {
//log.Errorf("add item key:%s slot:%d repo:%s", key, tgtSlot, lc.GetRegisterReverse(item.repo))
return
}
atomic.StoreInt32(&(item.Offset), target)
if curSlot == oldSlot {
ci.mux.Lock()
repo := lc.repos[item.Repo]
if _, ok := repo.Load(key); !ok {
repo.Store(key, item)
}
ci.cleanBuk[curSlot].Delete(key)
//log.Errorf("update set clean del key:%s slot:%d cur:%d repo:%s", key, curSlot, curSlot, lc.GetRegisterReverse(item.repo))
ci.mux.Unlock()
} else {
ci.cleanBuk[oldSlot].Delete(key)
//log.Errorf("update set clean del key:%s slot:%d cur:%d repo:%s", key, oldSlot, curSlot, lc.GetRegisterReverse(item.repo))
}
}
func (lc *LocalCacheRepo) Register(repoName string) int {
val, ok := lc.nameMap.Load(repoName)
if ok {
return val.(int)
} else {
lc.regMtx.Lock()
defer lc.regMtx.Unlock()
ret := lc.offset
if ret >= lc.cap {
log.Fatalf("Register %s exceed repo capacity %d", repoName, lc.cap)
return -1
} else {
lc.offset++
lc.nameMap.Store(repoName, ret)
return ret
}
}
}
func (lc *LocalCacheRepo) GetRegister(repoName string) int {
if repoName == "" {
log.Panic("can't register without name")
}
val, ok := lc.nameMap.Load(repoName)
if ok {
return val.(int)
} else {
return -1
}
}
func (lc *LocalCacheRepo) GetRegisterReverse(repoName int) string {
res := ""
lc.nameMap.Range(func(key, value interface{}) bool {
if value.(int) == repoName {
res = key.(string)
}
return true
})
return res
}
func (lc *LocalCacheRepo) Put(repoName int, key, val interface{}) *service.CacheItem {
if val == nil {
log.Panic("Put nil value")
}
return lc.putWithDuration(repoName, key, val, lc.duration)
}
func (lc *LocalCacheRepo) Tie(repoName int, key interface{}, ref *service.CacheItem) {
if repoName < 0 || repoName >= lc.cap {
return
}
if ref.Ref != nil {
log.Panic("can't ref more than 1 level")
}
repo := lc.repos[repoName]
item, ok := repo.Load(key)
if ok {
item.(*service.CacheItem).Ref = ref
}
}
/*
func (lc *LocalCacheRepo) Touch(repoName int, key interface{}) {
if repoName < 0 || repoName >= lc.cap {
return
}
repo := lc.repos[repoName]
item, ok := repo.Load(key)
if ok {
atomic.StoreInt32(&item.(*CacheItem).offset, atomic.LoadInt32(&lc.cnt)+lc.duration)
}
}
*/
//duration= -1 means never cleaned by timer
func (lc *LocalCacheRepo) putWithDuration(repoName int, key, val interface{}, duration int32) *service.CacheItem {
if repoName < 0 || repoName >= lc.cap {
log.Errorf("putWithDuration range error repo:%s key %v", lc.GetRegisterReverse(repoName), key)
return nil
}
repo := lc.repos[repoName]
item, ok := repo.Load(key)
cnt := atomic.LoadInt32(&lc.cnt)
ci := lc.cleanItems[repoName]
if ok {
item.(*service.CacheItem).Val = val
ref := item.(*service.CacheItem).Ref
target := cnt + duration
if target != atomic.LoadInt32(&item.(*service.CacheItem).Offset) {
ci.update(lc, item.(*service.CacheItem), lc.cleanCap, lc.cleanStep, cnt, target)
}
if (ref != nil) && (target != atomic.LoadInt32(&(ref.Offset))) {
ci = lc.cleanItems[ref.Repo]
ci.update(lc, ref, lc.cleanCap, lc.cleanStep, cnt, target)
}
return item.(*service.CacheItem)
}
itemc := new(service.CacheItem)
itemc.Val = val
itemc.Key = key
itemc.Offset = -1
itemc.Ref = nil
itemc.Repo = repoName
repo.Store(key, itemc)
ci.update(lc, itemc, lc.cleanCap, lc.cleanStep, cnt, cnt+duration)
return itemc
}
func (lc *LocalCacheRepo) Get(repoName int, key interface{}) (interface{}, bool) {
return lc.getWithDuration(repoName, key, lc.duration)
}
func (lc *LocalCacheRepo) getWithDuration(repoName int, key interface{}, duration int32) (interface{}, bool) {
if repoName < 0 || repoName >= lc.cap {
log.Errorf("getWithDuration range error repo:%s key %v", lc.GetRegisterReverse(repoName), key)
return nil, false
}
repo := lc.repos[repoName]
item, ok := repo.Load(key)
if ok {
if item.(*service.CacheItem).Val == nil {
log.Panic("getWithDuration hit but val nil")
}
cnt := atomic.LoadInt32(&lc.cnt)
ref := item.(*service.CacheItem).Ref
target := cnt + duration
if target != atomic.LoadInt32(&item.(*service.CacheItem).Offset) {
ci := lc.cleanItems[repoName]
ci.update(lc, item.(*service.CacheItem), lc.cleanCap, lc.cleanStep, cnt, target)
}
if (ref != nil) && (target != atomic.LoadInt32(&(ref.Offset))) {
ci := lc.cleanItems[ref.Repo]
ci.update(lc, ref, lc.cleanCap, lc.cleanStep, cnt, target)
}
return item.(*service.CacheItem).Val, true
}
return nil, false
}
/*
func (lc *LocalCacheRepo) Remove(repoName int, key interface{}) {
if repoName < 0 || repoName >= lc.cap {
return
}
repo := lc.repos[repoName]
repo.Delete(key)
}
*/
func (lc *LocalCacheRepo) Start(cap, duration int) {
lc.cnt = 0
lc.duration = 300
lc.cleanBukSize = 16
if duration != 0 {
lc.duration = int32(duration)
if lc.duration < lc.cleanBukSize-1 {
lc.duration = lc.cleanBukSize - 1
}
}
lc.ticker = time.NewTimer(0)
lc.cap = cap
lc.repos = make([]*sync.Map, lc.cap)
for i := 0; i < lc.cap; i++ {
lc.repos[i] = new(sync.Map)
}
lc.cleanStep = lc.duration / (lc.cleanBukSize - 1)
lc.cleanCap = lc.cleanBukSize * lc.cleanStep
lc.cleanIdx = (lc.cleanBukSize - 1) * lc.cleanStep
lc.cleanItems = make([]*cleanItem, lc.cleanBukSize)
for i := 0; i < lc.cap; i++ {
lc.cleanItems[i] = new(cleanItem)
lc.cleanItems[i].init(lc.cleanBukSize)
}
go lc.clean()
}
func (lc *LocalCacheRepo) clean() {
clean := lc.cleanIdx
for {
select {
case <-lc.ticker.C:
cnt := atomic.AddInt32(&lc.cnt, 1)
lc.ticker.Reset(time.Second)
if cnt == clean {
//log.Warn("local_cache start clean")
//repoCount := 0
//cleanRepoCount := 0
for i := 0; i < lc.cap; i++ {
repo := lc.repos[i]
ci := lc.cleanItems[i]
slot := (cnt % lc.cleanCap) / lc.cleanStep
cleanRepo := ci.cleanBuk[slot]
ci.mux.Lock()
cleanRepo.Range(func(key, value interface{}) bool {
if value != nil {
//log.Errorf("try key:%s slot:%d repo:%s %v %v clean:%d", key, slot, lc.GetRegisterReverse(i), value, ok, clean)
repo.Delete(key)
//log.Errorf("clean routine del item key:%s slot:%d repo:%s", key, slot, lc.GetRegisterReverse(i))
//repoCount++
cleanRepo.Delete(key)
//log.Errorf("clean routine clean del key:%s slot:%d repo:%s", key, slot, lc.GetRegisterReverse(i))
} else {
//log.Panic("clean routine del item key:%s slot:%d repo:%s but value nil", key, slot, lc.GetRegisterReverse(i))
}
//cleanRepoCount++
return true
})
ci.mux.Unlock()
}
//log.Warn("local_cache start clean finished: ", "repoCount: ", repoCount, " cleanRepoCount:", cleanRepoCount)
clean = atomic.AddInt32(&lc.cleanIdx, lc.cleanStep)
}
//log.Errorf("cnt:%d idx:%d step:%d cap:%d", cnt, lc.cleanIdx, lc.cleanStep, lc.cleanCap)
}
}
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"github.com/finogeeks/ligase/skunkworks/log"
"github.com/gomodule/redigo/redis"
"time"
)
func (rc *RedisCache) Lock(lockKey string, expire, wait int) (lockToken string, err error) {
b := make([]byte, 16)
rand.Read(b)
token := base64.StdEncoding.EncodeToString(b)
endTime := time.Now().Add(time.Duration(wait) * time.Second).UnixNano()
retry := 1
if wait < 0 {
locked := rc.doLock(lockKey, token, expire, retry)
if locked {
return token, nil
}
} else if wait == 0 {
for {
locked := rc.doLock(lockKey, token, expire, retry)
if locked {
return token, nil
}
retry++
}
} else {
for time.Now().UnixNano() <= endTime {
locked := rc.doLock(lockKey, token, expire, retry)
if locked {
return token, nil
}
retry++
}
}
ttl, err := rc.TTL(lockKey)
if err != nil {
log.Warnf("get lock key:%s ttl failed with err:%v", lockKey, err)
}
return "", errors.New(fmt.Sprintf("lock key:%s failed with max retry timeout, lock will expire in %d s", lockKey, ttl))
}
func (rc *RedisCache) doLock(lockKey, token string, expire, retry int) bool {
if retry > 1 { // sleep if not first time
time.Sleep(100 * time.Millisecond)
}
v, err := rc.SafeDo("SET", lockKey, token, "EX", expire, "NX")
if err == nil {
if v == nil {
log.Warnf("lock key:%s failed, get times:%d", lockKey, retry)
} else {
log.Infof("lock key:%s success, get times:%d", lockKey, retry)
return true
}
} else {
log.Warnf("lock key:%s failed with redis err:%v", lockKey, err)
}
return false
}
func (rc *RedisCache) UnLock(lockKey, token string, force bool) (err error) {
if force {
return rc.Del(lockKey)
}
val, err := rc.Get(lockKey)
if err != nil {
log.Errorf("unlock get lock key:%s token faild with redis err:%v", lockKey, err)
return err
}
lockToken, e := String(val, err)
if e != nil {
if e == redis.ErrNil {
log.Warnf("unlock parse lock key:%s token has expired", lockKey)
} else {
log.Errorf("unlock parse lock key:%s token faild with err:%v", lockKey, e)
}
return e
}
if lockToken != token {
return errors.New(fmt.Sprintf("unlock key:%s token:%s is not equal lock token:%s unlock failed", lockKey, token, lockToken))
}
return rc.Del(lockKey)
}
此差异已折叠。
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"fmt"
)
func (rc *RedisCache) GetAlias(key string) (string, error) {
key = fmt.Sprintf("roomalias:%s", key)
return rc.GetString(key)
}
func (rc *RedisCache) SetAlias(key, val string, expire int64) error {
key = fmt.Sprintf("roomalias:%s", key)
return rc.Set(key, val, expire)
}
func (rc *RedisCache) AliasExists(key string) (bool, error) {
key = fmt.Sprintf("roomalias:%s", key)
return rc.Exists(key)
}
func (rc *RedisCache) DelAlias(key string) error {
key = fmt.Sprintf("roomalias:%s", key)
return rc.Del(key)
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"errors"
"fmt"
"github.com/finogeeks/ligase/model/types"
"github.com/gomodule/redigo/redis"
"strings"
)
const (
ROOM_STATE_PREFIX = "room_state"
ROOM_STATE_EXT_PREFIX = "room_state_ext"
EXT_DOMAIN_PROFIX = "domain:"
)
func (rc *RedisCache) SetRoomState(roomID string, state []byte, token string) error {
key := fmt.Sprintf("%s:%s", ROOM_STATE_PREFIX, roomID)
val := make(map[string]interface{})
val["state"] = state
val["token"] = token
return rc.HMSet(key, val)
}
func (rc *RedisCache) GetRoomState(roomID string) ([]byte, string, error) {
key := fmt.Sprintf("%s:%s", ROOM_STATE_PREFIX, roomID)
reply, err := rc.HMGet(key, []string{"state", "token"})
var bytes []byte
var token string
reply, err = redis.Scan(reply, &bytes, &token)
return bytes, token, err
}
func (rc *RedisCache) GetRoomStateExt(roomID string) (*types.RoomStateExt, error) {
key := fmt.Sprintf("%s:%s", ROOM_STATE_EXT_PREFIX, roomID)
state, err := rc.HGetAll(key)
if err != nil {
return nil, err
}
if state == nil {
return nil, nil
}
ext := &types.RoomStateExt{
Domains: make(map[string]int64),
}
for k, v := range state {
switch k {
case "pre_state_id":
ext.PreStateId, _ = String(v, nil)
case "last_state_id":
ext.LastStateId, _ = String(v, nil)
case "pre_msg_id":
ext.PreMsgId, _ = String(v, nil)
case "last_msg_id":
ext.LastMsgId, _ = String(v, nil)
case "depth":
ext.Depth, _ = Int64(v, nil)
case "has_update":
ext.HasUpdate, _ = Bool(v, nil)
case "out_event_offset":
ext.OutEventOffset, _ = Int64(v, nil)
default:
if strings.HasPrefix(k, EXT_DOMAIN_PROFIX) {
ext.Domains[strings.TrimPrefix(k, EXT_DOMAIN_PROFIX)], _ = Int64(v, nil)
}
}
}
return ext, nil
}
func (rc *RedisCache) UpdateRoomStateExt(roomID string, ext map[string]interface{}) error {
key := fmt.Sprintf("%s:%s", ROOM_STATE_EXT_PREFIX, roomID)
return rc.HMSet(key, ext)
}
func (rc *RedisCache) SetRoomStateExt(roomID string, roomstateExt *types.RoomStateExt) error {
if roomstateExt == nil {
return errors.New("set roomstate ext nil")
}
ext := make(map[string]interface{})
ext["pre_state_id"] = roomstateExt.PreStateId
ext["last_state_id"] = roomstateExt.LastStateId
ext["pre_msg_id"] = roomstateExt.PreMsgId
ext["last_msg_id"] = roomstateExt.LastMsgId
ext["depth"] = roomstateExt.Depth
ext["has_update"] = roomstateExt.HasUpdate
ext["out_event_offset"] = roomstateExt.OutEventOffset
for k, v := range roomstateExt.Domains {
ext[EXT_DOMAIN_PROFIX+k] = v
}
key := fmt.Sprintf("%s:%s", ROOM_STATE_EXT_PREFIX, roomID)
return rc.HMSet(key, ext)
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"fmt"
"strconv"
"strings"
"time"
)
const (
TXN_DURATION = 30
)
func (rc *RedisCache) GetTxnID(roomID, msgID string) (string, bool) {
key := fmt.Sprintf("msgid:%s", roomID)
value, err := rc.HGetString(key, msgID)
if err != nil {
return "", false
}
s := strings.Split(value, ":")
if len(s) <= 1 {
rc.HDel(key, msgID)
return "", false
}
ts, err := strconv.ParseInt(s[0], 10, 64)
if err != nil {
rc.HDel(key, msgID)
return "", false
}
if time.Now().Unix()-ts > TXN_DURATION {
rc.HDel(key, msgID)
return "", false
} else {
return strings.Join(s[1:], ":"), true
}
}
func (rc *RedisCache) PutTxnID(roomID, txnID, eventID string) error {
key := fmt.Sprintf("msgid:%s", roomID)
return rc.HSet(key, txnID, fmt.Sprintf("%d:%s", time.Now().Unix(), eventID))
}
func (rc *RedisCache) ScanTxnID(cursor uint64, count int) ([]string, uint64, error) {
match := "msgid:*"
rs, next, err := rc.Scan(cursor, match, count)
result := []string{}
for _, r := range rs {
result = append(result, string(r.([]byte)))
}
return result, next, err
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cache
import (
"fmt"
)
func (rc *RedisCache) SetUserRoomMemberShip(roomID, userID string, mType int64) error {
key := fmt.Sprintf("membership:%s", userID)
return rc.HSet(key, roomID, mType)
}
func (rc *RedisCache) SetUserRoomMemberShipMulti(userID string, memberships map[string]int64) error {
key := fmt.Sprintf("membership:%s", userID)
return rc.HMSet(key, memberships)
}
func (rc *RedisCache) GetUserRoomMemberShip(userID string) (map[string]int64, error) {
key := fmt.Sprintf("membership:%s", userID)
result, err := rc.HGetAll(key)
if err != nil {
return nil, err
} else {
if result == nil {
return nil, nil
}
r := make(map[string]int64)
for k, v := range result {
r[k], _ = Int64(v, nil)
}
return r, nil
}
}
func (rc *RedisCache) CheckUserRoomMemberShipExists(userID string) (bool, error) {
key := fmt.Sprintf("membership:%s", userID)
return rc.Exists(key)
}
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// Modifications copyright (C) 2020 Finogeeks Co., Ltd
package cachewriter
import (
"github.com/finogeeks/ligase/cachewriter/consumers"
"github.com/finogeeks/ligase/common/basecomponent"
"github.com/finogeeks/ligase/skunkworks/log"
)
func SetupCacheWriterComponent(
base *basecomponent.BaseDendrite,
) {
dbEvConsumer := consumers.NewDBEventCacheConsumer(base.Cfg)
if err := dbEvConsumer.Start(); err != nil {
log.Panicw("failed to start cache data consumer", log.KeysAndValues{"error", err})
}
}
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// Modifications copyright (C) 2020 Finogeeks Co., Ltd
package consumers
import (
"context"
"fmt"
"github.com/finogeeks/ligase/common"
"time"
"github.com/finogeeks/ligase/common/config"
"github.com/finogeeks/ligase/model/dbtypes"
"github.com/finogeeks/ligase/skunkworks/log"
)
func init() {
Register(dbtypes.CATEGORY_ACCOUNT_DB_EVENT, NewAccountDBEvCacheConsumer)
}
type AccountDBEvCacheConsumer struct {
pool PoolProviderInterface
//msgChan chan *dbtypes.DBEvent
msgChan chan common.ContextMsg
}
func (s *AccountDBEvCacheConsumer) startWorker(msgChan chan common.ContextMsg) {
var res error
for msg := range msgChan {
output := msg.Msg.(*dbtypes.DBEvent)
start := time.Now().UnixNano() / 1000000
key := output.Key
data := output.AccountDBEvents
switch key {
case dbtypes.AccountDataInsertKey:
res = s.OnInsertAccountData(data.AccountDataInsert)
case dbtypes.FilterInsertKey:
res = s.OnInsertFilter(data.FilterInsert)
case dbtypes.ProfileInsertKey:
res = s.OnUpsertProfile(data.ProfileInsert)
case dbtypes.ProfileInitKey:
res = s.OnInitProfile(data.ProfileInsert)
case dbtypes.RoomTagInsertKey:
res = s.OnInsertRoomTag(data.RoomTagInsert)
case dbtypes.RoomTagDeleteKey:
res = s.OnDeleteRoomTag(data.RoomTagDelete)
case dbtypes.UserInfoInsertKey:
res = s.OnUpsertUserInfo(data.UserInfoInsert)
case dbtypes.UserInfoInitKey:
res = s.OnInitUserInfo(data.UserInfoInsert)
case dbtypes.UserInfoDeleteKey:
res = s.OnDeleteUserInfo(data.UserInfoDelete)
default:
res = nil
log.Infow("account db event: ignoring unknown output type", log.KeysAndValues{"key", output.Key})
}
if res != nil {
log.Errorf("write account db event to cache error %v key: %s", res, dbtypes.AccountDBEventKeyToStr(key))
}
now := time.Now().UnixNano() / 1000000
log.Infof("AccountDBEvCacheConsumer process %s takes %d", dbtypes.AccountDBEventKeyToStr(key), now-start)
}
}
func NewAccountDBEvCacheConsumer() ConsumerInterface {
s := new(AccountDBEvCacheConsumer)
s.msgChan = make(chan common.ContextMsg, 4096)
return s
}
func (s *AccountDBEvCacheConsumer) SetPool(pool PoolProviderInterface) {
s.pool = pool
}
func (s *AccountDBEvCacheConsumer) Prepare(cfg *config.Dendrite) {
}
func (s *AccountDBEvCacheConsumer) Start() {
go s.startWorker(s.msgChan)
}
func (s *AccountDBEvCacheConsumer) OnMessage(ctx context.Context, dbEv *dbtypes.DBEvent) error {
s.msgChan <- common.ContextMsg{Ctx: ctx, Msg: dbEv}
return nil
}
func (s *AccountDBEvCacheConsumer) OnInsertAccountData(
msg *dbtypes.AccountDataInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
if msg.RoomID != "" {
roomAccountDatKey := fmt.Sprintf("%s:%s:%s:%s", "room_account_data", msg.UserID, msg.RoomID, msg.Type)
err := conn.Send("hmset", roomAccountDatKey, "user_id", msg.UserID, "room_id", msg.RoomID, "type", msg.Type, "content", msg.Content)
if err != nil {
return err
}
err = conn.Send("hmset", fmt.Sprintf("%s:%s", "room_account_data_list", msg.UserID), roomAccountDatKey, roomAccountDatKey)
if err != nil {
return err
}
} else {
accountDatKey := fmt.Sprintf("%s:%s:%s", "account_data", msg.UserID, msg.Type)
err := conn.Send("hmset", accountDatKey, "user_id", msg.UserID, "type", msg.Type, "content", msg.Content)
if err != nil {
return err
}
err = conn.Send("hmset", fmt.Sprintf("%s:%s", "account_data_list", msg.UserID), accountDatKey, accountDatKey)
if err != nil {
return err
}
}
return conn.Flush()
}
func (s *AccountDBEvCacheConsumer) OnInsertFilter(
msg *dbtypes.FilterInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
err := conn.Send("hmset", fmt.Sprintf("%s:%s:%s", "filter", msg.UserID, msg.FilterID), "user_id", msg.UserID, "id", msg.FilterID, "filter", msg.Filter)
if err != nil {
return err
}
/*filterHash := fn.GetStringHash(msg.Filter)
err = conn.Send("hmset", fmt.Sprintf("%s:%s:%s", "filter_content", msg.UserID, filterHash), "user_id", msg.UserID, "id", msg.FilterID, "filter", msg.Filter)
if err != nil {
return err
}*/
return conn.Flush()
}
func (s *AccountDBEvCacheConsumer) OnUpsertProfile(
msg *dbtypes.ProfileInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
err := conn.Send("hmset", fmt.Sprintf("%s:%s", "profile", msg.UserID), "user_id", msg.UserID, "display_name", msg.DisplayName, "avatar_url", msg.AvatarUrl)
if err != nil {
return err
}
return conn.Flush()
}
func (s *AccountDBEvCacheConsumer) OnInitProfile(
msg *dbtypes.ProfileInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
err := conn.Send("hmset", fmt.Sprintf("%s:%s", "profile", msg.UserID), "user_id", msg.UserID)
if err != nil {
return err
}
return conn.Flush()
}
func (s *AccountDBEvCacheConsumer) OnInsertRoomTag(
msg *dbtypes.RoomTagInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
tagKey := fmt.Sprintf("%s:%s:%s:%s", "room_tags", msg.UserID, msg.RoomID, msg.Tag)
err := conn.Send("hmset", tagKey, "user_id", msg.UserID, "room_id", msg.RoomID, "tag", msg.Tag, "content", msg.Content)
if err != nil {
return err
}
err = conn.Send("hmset", fmt.Sprintf("%s:%s", "user_tags_list", msg.UserID), tagKey, tagKey)
if err != nil {
return err
}
err = conn.Send("hmset", fmt.Sprintf("%s:%s:%s", "room_tags_list", msg.UserID, msg.RoomID), tagKey, tagKey)
if err != nil {
return err
}
return conn.Flush()
}
func (s *AccountDBEvCacheConsumer) OnDeleteRoomTag(
msg *dbtypes.RoomTagDelete,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
tagKey := fmt.Sprintf("%s:%s:%s:%s", "room_tags", msg.UserID, msg.RoomID, msg.Tag)
err := conn.Send("del", tagKey)
if err != nil {
return err
}
err = conn.Send("hdel", fmt.Sprintf("%s:%s", "user_tags_list", msg.UserID), tagKey)
if err != nil {
return err
}
err = conn.Send("hdel", fmt.Sprintf("%s:%s:%s", "room_tags_list", msg.UserID, msg.RoomID), tagKey)
if err != nil {
return err
}
return nil
}
func (s *AccountDBEvCacheConsumer) OnUpsertUserInfo(
msg *dbtypes.UserInfoInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
err := conn.Send("hmset", fmt.Sprintf("%s:%s", "user_info", msg.UserID), "user_id", msg.UserID, "user_name", msg.UserName, "job_number", msg.JobNumber, "mobile", msg.Mobile, "landline", msg.Landline, "email", msg.Email)
if err != nil {
return err
}
return conn.Flush()
}
func (s *AccountDBEvCacheConsumer) OnInitUserInfo(
msg *dbtypes.UserInfoInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
err := conn.Send("hmset", fmt.Sprintf("%s:%s", "user_info", msg.UserID), "user_id", msg.UserID)
if err != nil {
return err
}
return conn.Flush()
}
func (s *AccountDBEvCacheConsumer) OnDeleteUserInfo(
msg *dbtypes.UserInfoDelete,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
UserInfokey := fmt.Sprintf("%s:%s", "user_info", msg.UserID)
err := conn.Send("del", UserInfokey)
if err != nil {
return err
}
return nil
}
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// Modifications copyright (C) 2020 Finogeeks Co., Ltd
package consumers
import (
"context"
"math/rand"
"sync"
"time"
"github.com/finogeeks/ligase/common"
"github.com/finogeeks/ligase/common/config"
"github.com/finogeeks/ligase/core"
"github.com/finogeeks/ligase/model/dbtypes"
"github.com/gomodule/redigo/redis"
jsoniter "github.com/json-iterator/go"
log "github.com/finogeeks/ligase/skunkworks/log"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
// DBEventDataConsumer consumes db events for cache writer.
type DBEventCacheConsumer struct {
channel core.IChannel
pools []*redis.Pool
poolSize int
consumerRepo sync.Map
}
// NewDBUpdateDataConsumer creates a new DBUpdateData consumer. Call Start() to begin consuming from room servers.
func NewDBEventCacheConsumer(
cfg *config.Dendrite,
) *DBEventCacheConsumer {
val, ok := common.GetTransportMultiplexer().GetChannel(
cfg.Kafka.Consumer.CacheUpdates.Underlying,
cfg.Kafka.Consumer.CacheUpdates.Name,
)
if ok {
channel := val.(core.IChannel)
s := &DBEventCacheConsumer{
channel: channel,
}
channel.SetHandler(s)
s.poolSize = len(cfg.Redis.Uris)
s.pools = make([]*redis.Pool, s.poolSize)
for i := 0; i < s.poolSize; i++ {
addr := cfg.Redis.Uris[i]
s.pools[i] = &redis.Pool{
MaxIdle: 200,
MaxActive: 200,
Wait: true,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) { return redis.DialURL(addr) },
}
}
//load instance
for key, f := range newHandler {
instance := f()
instance.SetPool(s)
instance.Prepare(cfg)
log.Infof("NewDBEventCacheConsumer: load instance %s", dbtypes.DBCategoryToStr(key))
s.consumerRepo.Store(key, instance)
}
return s
}
return nil
}
func (s *DBEventCacheConsumer) Pool() *redis.Pool {
slot := rand.Int() % s.poolSize
return s.pools[slot]
}
// Start consuming from room servers
func (s *DBEventCacheConsumer) Start() error {
s.consumerRepo.Range(func(key, value interface{}) bool {
handler := value.(ConsumerInterface)
handler.Start()
return true
})
//s.channel.Start()
return nil
}
func (s *DBEventCacheConsumer) OnMessage(ctx context.Context, topic string, partition int32, data []byte, rawMsg interface{}) {
var output dbtypes.DBEvent
if err := json.Unmarshal(data, &output); err != nil {
log.Errorw("dbevent: message parse failure", log.KeysAndValues{"error", err})
return
}
category := output.Category
val, ok := s.consumerRepo.Load(category)
if ok {
consumer := val.(ConsumerInterface)
consumer.OnMessage(ctx, &output)
}
}
// Copyright (C) 2020 Finogeeks Co., Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License, version 3,
// as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package consumers
import (
"context"
"github.com/finogeeks/ligase/common/config"
"github.com/finogeeks/ligase/model/dbtypes"
log "github.com/finogeeks/ligase/skunkworks/log"
"github.com/gomodule/redigo/redis"
"sync"
)
var regMu sync.RWMutex
var newHandler = make(map[int64]func() ConsumerInterface)
type ConsumerInterface interface {
OnMessage(context.Context, *dbtypes.DBEvent) error
Prepare(*config.Dendrite)
SetPool(PoolProviderInterface)
Start()
}
func Register(name int64, f func() ConsumerInterface) {
regMu.Lock()
defer regMu.Unlock()
if f == nil {
log.Panicf("Register: %s func nil", dbtypes.DBCategoryToStr(name))
}
if _, ok := newHandler[name]; ok {
log.Panicf("Register: %s already registered", dbtypes.DBCategoryToStr(name))
}
newHandler[name] = f
}
type PoolProviderInterface interface {
Pool() *redis.Pool
}
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// Modifications copyright (C) 2020 Finogeeks Co., Ltd
package consumers
import (
"context"
"fmt"
"github.com/finogeeks/ligase/common"
"github.com/gomodule/redigo/redis"
"time"
"github.com/finogeeks/ligase/common/config"
"github.com/finogeeks/ligase/model/dbtypes"
"github.com/finogeeks/ligase/skunkworks/log"
)
func init() {
Register(dbtypes.CATEGORY_DEVICE_DB_EVENT, NewDeviceDBEvCacheConsumer)
}
type DeviceDBEvCacheConsumer struct {
pool PoolProviderInterface
//msgChan chan *dbtypes.DBEvent
msgChan chan common.ContextMsg
}
func (s *DeviceDBEvCacheConsumer) startWorker(msgChan chan common.ContextMsg) {
var res error
for msg := range msgChan {
output := msg.Msg.(*dbtypes.DBEvent)
start := time.Now().UnixNano() / 1000000
key := output.Key
data := output.DeviceDBEvents
switch key {
case dbtypes.DeviceInsertKey:
res = s.onDeviceInsert(*data.DeviceInsert)
case dbtypes.DeviceDeleteKey:
res = s.onDeviceDelete(*data.DeviceDelete)
case dbtypes.MigDeviceInsertKey:
res = s.onMigDeviceInsert(*data.MigDeviceInsert)
case dbtypes.DeviceRecoverKey:
res = s.onDeviceRecover(*data.DeviceInsert)
case dbtypes.DeviceUpdateTsKey:
res = s.onUpdateDeviceActiveTs(*data.DeviceUpdateTs)
default:
res = nil
log.Infow("device db event: ignoring unknown output type", log.KeysAndValues{"key", output.Key})
}
if res != nil {
log.Errorf("write device db event to cache error %v key: %s", res, dbtypes.DeviceDBEventKeyToStr(key))
}
now := time.Now().UnixNano() / 1000000
log.Infof("DeviceDBEvCacheConsumer process %s takes %d", dbtypes.DeviceDBEventKeyToStr(key), now-start)
}
}
// NewDeviceDBEvCacheConsumer creates a new DBUpdateData consumer. Call Start() to begin consuming from room servers.
func NewDeviceDBEvCacheConsumer() ConsumerInterface {
s := new(DeviceDBEvCacheConsumer)
s.msgChan = make(chan common.ContextMsg, 1024)
return s
}
func (s *DeviceDBEvCacheConsumer) SetPool(pool PoolProviderInterface) {
s.pool = pool
}
func (s *DeviceDBEvCacheConsumer) Prepare(cfg *config.Dendrite) {
}
func (s *DeviceDBEvCacheConsumer) Start() {
go s.startWorker(s.msgChan)
}
func (s *DeviceDBEvCacheConsumer) OnMessage(ctx context.Context, dbEv *dbtypes.DBEvent) error {
s.msgChan <- common.ContextMsg{Ctx: ctx, Msg: dbEv}
return nil
}
func (s *DeviceDBEvCacheConsumer) onDeviceInsert(
msg dbtypes.DeviceInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
mac := common.GetDeviceMac(msg.DeviceID)
result, err := redis.Values(conn.Do("hgetall", fmt.Sprintf("%s:%s:%s", "device_mac_list", msg.UserID, mac)))
if err != nil {
return err
}
deviceMap := make(map[string]bool)
for _, deviceID := range result {
did := deviceID.([]byte)
deviceMap[string(did)] = true
}
for did := range deviceMap {
if did != msg.DeviceID {
err := conn.Send("del", fmt.Sprintf("%s:%s:%s", "algorithm", msg.UserID, did))
if err != nil {
return err
}
result, err := redis.Values(conn.Do("hgetall", fmt.Sprintf("%s:%s:%s", "device_key_list", msg.UserID, did)))
if err != nil {
return err
} else {
keyMap := make(map[string]bool)
for _, key := range result {
keyID := key.([]byte)
keyMap[string(keyID)] = true
}
for keyID := range keyMap {
err := conn.Send("del", keyID)
if err != nil {
return err
}
err = conn.Send("hdel", fmt.Sprintf("%s:%s:%s", "device_key_list", msg.UserID, did), keyID)
if err != nil {
return err
}
}
}
result, err = redis.Values(conn.Do("hgetall", fmt.Sprintf("%s:%s:%s", "one_time_key_list", msg.UserID, did)))
if err != nil {
return err
} else {
keyMap := make(map[string]bool)
for _, key := range result {
keyID := key.([]byte)
keyMap[string(keyID)] = true
}
for keyID := range keyMap {
err := conn.Send("del", keyID)
if err != nil {
return err
}
err = conn.Send("hdel", fmt.Sprintf("%s:%s:%s", "one_time_key_list", msg.UserID, did), keyID)
if err != nil {
return err
}
}
}
err = conn.Send("del", fmt.Sprintf("%s:%s:%s", "device", msg.UserID, did))
if err != nil {
return err
}
err = conn.Send("hdel", fmt.Sprintf("%s:%s", "device_list", msg.UserID), did)
if err != nil {
return err
}
err = conn.Send("hdel", fmt.Sprintf("%s:%s:%s", "device_mac_list", msg.UserID, mac), did)
if err != nil {
return err
}
}
}
err = conn.Send("hmset", fmt.Sprintf("%s:%s:%s", "device", msg.UserID, msg.DeviceID), "did", msg.DeviceID,
"user_id", msg.UserID, "created_ts", msg.CreatedTs, "display_name", msg.DisplayName, "device_type", msg.DeviceType, "identifier", msg.Identifier)
if err != nil {
return err
}
err = conn.Send("hmset", fmt.Sprintf("%s:%s", "device_list", msg.UserID), msg.DeviceID, msg.DeviceID)
if err != nil {
return err
}
err = conn.Send("hmset", fmt.Sprintf("%s:%s:%s", "device_mac_list", msg.UserID, mac), msg.DeviceID, msg.DeviceID)
if err != nil {
return err
}
return conn.Flush()
}
func (s *DeviceDBEvCacheConsumer) onDeviceRecover(
msg dbtypes.DeviceInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
mac := common.GetDeviceMac(msg.DeviceID)
err := conn.Send("hmset", fmt.Sprintf("%s:%s:%s", "device", msg.UserID, msg.DeviceID), "did", msg.DeviceID,
"user_id", msg.UserID, "created_ts", msg.CreatedTs, "display_name", msg.DisplayName, "device_type", msg.DeviceType, "identifier", msg.Identifier)
if err != nil {
return err
}
err = conn.Send("hmset", fmt.Sprintf("%s:%s", "device_list", msg.UserID), msg.DeviceID, msg.DeviceID)
if err != nil {
return err
}
err = conn.Send("hmset", fmt.Sprintf("%s:%s:%s", "device_mac_list", msg.UserID, mac), msg.DeviceID, msg.DeviceID)
if err != nil {
return err
}
return conn.Flush()
}
func (s *DeviceDBEvCacheConsumer) onMigDeviceInsert(
msg dbtypes.MigDeviceInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
err := conn.Send("set", fmt.Sprintf("%s:%s", "tokens", msg.AccessToken), msg.MigAccessToken)
if err != nil {
return err
}
return conn.Flush()
}
func (s *DeviceDBEvCacheConsumer) onDeviceDelete(
msg dbtypes.DeviceDelete,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
mac := common.GetDeviceMac(msg.DeviceID)
err := conn.Send("del", fmt.Sprintf("%s:%s:%s", "algorithm", msg.UserID, msg.DeviceID))
if err != nil {
return err
}
result, err := redis.Values(conn.Do("hgetall", fmt.Sprintf("%s:%s:%s", "device_key_list", msg.UserID, msg.DeviceID)))
if err != nil {
return err
} else {
keyMap := make(map[string]bool)
for _, key := range result {
keyID := key.([]byte)
keyMap[string(keyID)] = true
}
for keyID := range keyMap {
err := conn.Send("del", keyID)
if err != nil {
return err
}
err = conn.Send("hdel", fmt.Sprintf("%s:%s:%s", "device_key_list", msg.UserID, msg.DeviceID), keyID)
if err != nil {
return err
}
}
}
result, err = redis.Values(conn.Do("hgetall", fmt.Sprintf("%s:%s:%s", "one_time_key_list", msg.UserID, msg.DeviceID)))
if err != nil {
return err
} else {
keyMap := make(map[string]bool)
for _, key := range result {
keyID := key.([]byte)
keyMap[string(keyID)] = true
}
for keyID := range keyMap {
err := conn.Send("del", keyID)
if err != nil {
return err
}
err = conn.Send("hdel", fmt.Sprintf("%s:%s:%s", "one_time_key_list", msg.UserID, msg.DeviceID), keyID)
if err != nil {
return err
}
}
}
err = conn.Send("del", fmt.Sprintf("%s:%s:%s", "device", msg.UserID, msg.DeviceID))
if err != nil {
return err
}
err = conn.Send("hdel", fmt.Sprintf("%s:%s", "device_list", msg.UserID), msg.DeviceID)
if err != nil {
return err
}
err = conn.Send("hdel", fmt.Sprintf("%s:%s:%s", "device_mac_list", msg.UserID, mac), msg.DeviceID)
if err != nil {
return err
}
return conn.Flush()
}
func (s *DeviceDBEvCacheConsumer) onUpdateDeviceActiveTs(
msg dbtypes.DeviceUpdateTs,
) error {
return nil
}
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// Modifications copyright (C) 2020 Finogeeks Co., Ltd
package consumers
import (
"context"
"fmt"
"github.com/finogeeks/ligase/common"
"github.com/finogeeks/ligase/common/config"
"github.com/finogeeks/ligase/model/dbtypes"
"github.com/finogeeks/ligase/skunkworks/log"
"time"
)
func init() {
Register(dbtypes.CATEGORY_E2E_DB_EVENT, NewE2EDBEvCacheConsumer)
}
type E2EDBEvCacheConsumer struct {
pool PoolProviderInterface
//msgChan chan *dbtypes.DBEvent
msgChan chan common.ContextMsg
}
func (s *E2EDBEvCacheConsumer) startWorker(msgChan chan common.ContextMsg) {
var res error
for msg := range msgChan {
output := msg.Msg.(*dbtypes.DBEvent)
if output.IsRecovery {
start := time.Now().UnixNano() / 1000000
key := output.Key
data := output.E2EDBEvents
switch key {
case dbtypes.DeviceKeyInsertKey:
res = s.onDeviceKeyInsert(*data.KeyInsert)
case dbtypes.OneTimeKeyInsertKey:
res = s.onOneTimeKeyInsert(*data.KeyInsert)
case dbtypes.OneTimeKeyDeleteKey:
res = s.onOneTimeKeyDelete(*data.KeyDelete)
case dbtypes.AlInsertKey:
res = s.onAlInsert(*data.AlInsert)
default:
res = nil
log.Infow("encrypt api db event: ignoring unknown output type", log.KeysAndValues{"key", output.Key})
}
if res != nil {
log.Errorf("write encrypt api db event to cache error %v key: %s", res, dbtypes.E2EDBEventKeyToStr(key))
}
now := time.Now().UnixNano() / 1000000
log.Infof("E2EDBEvCacheConsumer process %s takes %d", dbtypes.E2EDBEventKeyToStr(key), now-start)
}
}
}
// NewE2EDBEvCacheConsumer creates a new DBUpdateData consumer. Call Start() to begin consuming from room servers.
func NewE2EDBEvCacheConsumer() ConsumerInterface {
s := new(E2EDBEvCacheConsumer)
s.msgChan = make(chan common.ContextMsg, 1024)
return s
}
func (s *E2EDBEvCacheConsumer) SetPool(pool PoolProviderInterface) {
s.pool = pool
}
func (s *E2EDBEvCacheConsumer) Prepare(cfg *config.Dendrite) {
}
func (s *E2EDBEvCacheConsumer) Start() {
go s.startWorker(s.msgChan)
}
func (s *E2EDBEvCacheConsumer) OnMessage(ctx context.Context, dbEv *dbtypes.DBEvent) error {
s.msgChan <- common.ContextMsg{Ctx: ctx, Msg: dbEv}
return nil
}
func (s *E2EDBEvCacheConsumer) onDeviceKeyInsert(
msg dbtypes.KeyInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
keyKey := fmt.Sprintf("%s:%s:%s:%s", "device_key", msg.UserID, msg.DeviceID, msg.Algorithm)
err := conn.Send("hmset", keyKey, "device_id", msg.DeviceID, "user_id", msg.UserID, "key_info", msg.KeyInfo,
"algorithm", msg.Algorithm, "signature", msg.Signature)
if err != nil {
return err
}
err = conn.Send("hmset", fmt.Sprintf("%s:%s:%s", "device_key_list", msg.UserID, msg.DeviceID), keyKey, keyKey)
if err != nil {
return err
}
return conn.Flush()
}
func (s *E2EDBEvCacheConsumer) onOneTimeKeyInsert(
msg dbtypes.KeyInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
keyKey := fmt.Sprintf("%s:%s:%s:%s:%s", "one_time_key", msg.UserID, msg.DeviceID, msg.KeyID, msg.Algorithm)
err := conn.Send("hmset", keyKey, "device_id", msg.DeviceID, "user_id", msg.UserID, "key_id", msg.KeyID,
"key_info", msg.KeyInfo, "algorithm", msg.Algorithm, "signature", msg.Signature)
if err != nil {
return err
}
err = conn.Send("hmset", fmt.Sprintf("%s:%s:%s", "one_time_key_list", msg.UserID, msg.DeviceID), keyKey, keyKey)
if err != nil {
return err
}
return conn.Flush()
}
func (s *E2EDBEvCacheConsumer) onOneTimeKeyDelete(
msg dbtypes.KeyDelete,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
keyKey := fmt.Sprintf("%s:%s:%s:%s:%s", "one_time_key", msg.UserID, msg.DeviceID, msg.KeyID, msg.Algorithm)
err := conn.Send("del", keyKey)
if err != nil {
return err
}
err = conn.Send("hdel", fmt.Sprintf("%s:%s:%s", "one_time_key_list", msg.UserID, msg.DeviceID), keyKey)
if err != nil {
return err
}
return conn.Flush()
}
func (s *E2EDBEvCacheConsumer) onAlInsert(
msg dbtypes.AlInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
err := conn.Send("hmset", fmt.Sprintf("%s:%s:%s", "algorithm", msg.UserID, msg.DeviceID),
"device_id", msg.DeviceID, "user_id", msg.UserID, "algorithms", msg.Algorithm)
if err != nil {
return err
}
return conn.Flush()
}
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// Modifications copyright (C) 2020 Finogeeks Co., Ltd
package consumers
import (
"context"
"fmt"
"github.com/finogeeks/ligase/common"
"github.com/finogeeks/ligase/common/config"
"github.com/finogeeks/ligase/model/dbtypes"
log "github.com/finogeeks/ligase/skunkworks/log"
"time"
)
func init() {
Register(dbtypes.CATEGORY_PRESENCE_DB_EVENT, NewPresenceDBEvCacheConsumer)
}
type PresenceDBEvCacheConsumer struct {
pool PoolProviderInterface
//msgChan chan *dbtypes.DBEvent
msgChan chan common.ContextMsg
}
func (s *PresenceDBEvCacheConsumer) startWorker(msgChan chan common.ContextMsg) {
var res error
for msg := range msgChan {
output := msg.Msg.(*dbtypes.DBEvent)
start := time.Now().UnixNano() / 1000000
key := output.Key
data := output.PresenceDBEvents
switch key {
case dbtypes.PresencesInsertKey:
res = s.OnInsertPresences(data.PresencesInsert)
default:
res = nil
log.Infow("presence db event: ignoring unknown output type", log.KeysAndValues{"key", output.Key})
}
if res != nil {
log.Errorf("write presence db event to cache error %v key: %s", res, dbtypes.PresenceDBEventKeyToStr(key))
}
now := time.Now().UnixNano() / 1000000
log.Infof("PresenceDBEvCacheConsumer process %s takes %d", dbtypes.PresenceDBEventKeyToStr(key), now-start)
}
}
func NewPresenceDBEvCacheConsumer() ConsumerInterface {
s := new(PresenceDBEvCacheConsumer)
s.msgChan = make(chan common.ContextMsg, 4096)
return s
}
func (s *PresenceDBEvCacheConsumer) SetPool(pool PoolProviderInterface) {
s.pool = pool
}
func (s *PresenceDBEvCacheConsumer) Prepare(cfg *config.Dendrite) {
}
func (s *PresenceDBEvCacheConsumer) Start() {
go s.startWorker(s.msgChan)
}
func (s *PresenceDBEvCacheConsumer) OnMessage(ctx context.Context, dbEv *dbtypes.DBEvent) error {
s.msgChan <- common.ContextMsg{Ctx: ctx, Msg: dbEv}
return nil
}
func (s *PresenceDBEvCacheConsumer) OnInsertPresences(
msg *dbtypes.PresencesInsert,
) error {
conn := s.pool.Pool().Get()
defer conn.Close()
presencesKey := fmt.Sprintf("%s:%s", "presences", msg.UserID)
err := conn.Send("hmset", presencesKey, "user_id", msg.UserID, "status", msg.Status, "status_msg", msg.StatusMsg, "ext_status_msg", msg.ExtStatusMsg)
if err != nil {
return err
}
return conn.Flush()
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册