提交 e3bacd54 编写于 作者: Huan (李卓桓)'s avatar Huan (李卓桓)

Add DelayQueueExecutor to manage background API tasks (#1305)

上级 82518d57
......@@ -5,6 +5,12 @@ import fs from 'fs-extra'
import { MemoryCard } from 'memory-card'
import { StateSwitch } from 'state-switch'
import { FlashStoreSync } from 'flash-store'
import {
Subscription,
} from 'rxjs'
import {
DelayQueueExector,
} from 'rx-queue'
import Misc from '../misc'
......@@ -54,42 +60,23 @@ export interface BridgeOptions {
}
export class Bridge extends PadchatRpc {
// private readonly padchatRpc : PadchatRpc
// private autoData : AutoDataType
private memorySlot: PadchatMemorySlot
private loginScanQrcode? : string
private loginScanStatus? : number
private loginTimer?: NodeJS.Timer
private loginTimer? : NodeJS.Timer
private selfId? : string
// private selfName? : string
// private password? : string
// private nickname? : string
private cacheRoomRawPayload? : FlashStoreSync<string, PadchatRoomPayload>
private cacheContactRawPayload? : FlashStoreSync<string, PadchatContactPayload>
private cacheRoomMemberRawPayload? : FlashStoreSync<string, {
[contactId: string]: PadchatRoomMemberPayload,
}>
/**
* cacheRoomMemberRawPayload[roomId1] = {
* contactId1: payload1,
* contactId2: payload2
* }
*
* cacheRoomMemberRawPayload[roomId2] = {
* contactId2: payload3,
* contactId3: payload4,
* }
*/
private cacheRoomMemberRawPayload? : FlashStoreSync<
string,
{
[contactId: string]: PadchatRoomMemberPayload,
}
>
private readonly state: StateSwitch
private readonly state : StateSwitch
private readonly delayQueueExecutor : DelayQueueExector
private delayQueueExecutorSubscription? : Subscription
constructor(
public options: BridgeOptions,
......@@ -103,6 +90,13 @@ export class Bridge extends PadchatRpc {
// this.padchatRpc = new PadchatRpc(options.endpoint, options.token)
this.state = new StateSwitch('PuppetPadchatBridge')
/**
* Executer Queue: execute one task at a time,
* delay between them for 3 second
*/
this.delayQueueExecutor = new DelayQueueExector(1000 * 3)
}
private async initCache(
......@@ -176,6 +170,7 @@ export class Bridge extends PadchatRpc {
this.cacheContactRawPayload = undefined
this.cacheRoomMemberRawPayload = undefined
this.cacheRoomRawPayload = undefined
} else {
log.warn('PuppetPadchatBridge', 'releaseCache() cache not exist.')
}
......@@ -190,6 +185,14 @@ export class Bridge extends PadchatRpc {
this.state.on('pending')
if (this.delayQueueExecutorSubscription) {
throw new Error('this.delayExecutorSubscription exist')
} else {
this.delayQueueExecutorSubscription = this.delayQueueExecutor.subscribe(unit => {
log.verbose('PuppetPadchatBridge', 'startQueues() delayQueueExecutor.subscribe(%s) executed', unit.name)
})
}
this.memorySlot = {
...this.memorySlot,
...await this.options.memory.get<PadchatMemorySlot>(MEMORY_SLOT_NAME),
......@@ -198,15 +201,6 @@ export class Bridge extends PadchatRpc {
// await this.padchatRpc.start()
await super.start()
// this.padchatRpc.on('message', messageRawPayload => {
// log.silly('PuppetPadchatBridge', 'start() padchatRpc.on(message)')
// this.emit('message', messageRawPayload)
// })
// No need to call logout() in bridge, because PupetPadchat will call logout() when received the 'logout' event
// this.padchatRpc.on('logout', data => {
// this.on('logout', () => this.logout())
await this.tryLoad62Data()
const succeed = await this.tryAutoLogin(this.memorySlot)
......@@ -222,6 +216,13 @@ export class Bridge extends PadchatRpc {
this.state.off('pending')
if (this.delayQueueExecutorSubscription) {
this.delayQueueExecutorSubscription.unsubscribe()
this.delayQueueExecutor.unsubscribe()
} else {
log.warn('PuppetPadchatBridge', 'stop() subscript not exist')
}
this.stopCheckScan()
// await this.padchatRpc.stop()
......@@ -588,10 +589,13 @@ export class Bridge extends PadchatRpc {
}
return memorySlot
} else {
/**
* 4. New user login, generate 62data for it
*/
}
/**
* 4. New user login, generate 62data for it
*/
// Build a new code block to make tslint happy: no-shadow-variable
if (true) {
log.verbose('PuppetPadchatBridge', 'refresh62Data() user switch detected: from "%s" to "%s"',
memorySlot.currentUserId,
userId,
......@@ -742,7 +746,7 @@ export class Bridge extends PadchatRpc {
// const syncContactList = await this.padchatRpc.WXSyncContact()
const syncContactList = await this.WXSyncContact()
await new Promise(r => setTimeout(r, 1 * 1000))
await new Promise(r => setTimeout(r, 3 * 1000))
// console.log('syncContactList:', syncContactList)
......@@ -785,7 +789,15 @@ export class Bridge extends PadchatRpc {
const roomPayload = syncContact as PadchatRoomPayload
this.cacheRoomRawPayload.set(roomId, roomPayload)
await this.syncRoomMember(roomId)
/**
* Use delay queue executor to sync room:
* add syncRoomMember task to the queue
*/
this.delayQueueExecutor.execute(
() => this.syncRoomMember(roomId),
'syncRoomMember(' + roomId + ')',
)
} else if (pfHelper.isContactId(syncContact.user_name)) {
/**
......
import { EventEmitter } from 'events'
// import cuid from 'cuid'
import WebSocket from 'ws'
import WebSocket from 'ws'
import { Subscription } from 'rxjs'
import Peer, {
parse,
} from 'json-rpc-peer'
} from 'json-rpc-peer'
import {
ThrottleQueue,
DebounceQueue,
} from 'rx-queue'
} from 'rx-queue'
// , {
// JsonRpcPayload,
......@@ -74,9 +74,13 @@ export class PadchatRpc extends EventEmitter {
private socket? : WebSocket
private readonly jsonRpc : any // Peer
private readonly throttleQueue: ThrottleQueue<string>
private readonly debounceQueue: DebounceQueue<string>
private readonly logoutThrottleQueue: ThrottleQueue<string>
private readonly throttleQueue : ThrottleQueue<string>
private readonly debounceQueue : DebounceQueue<string>
private readonly logoutThrottleQueue : ThrottleQueue<string>
private throttleSubscription? : Subscription
private debounceSubscription? : Subscription
private logoutThrottleSubscription? : Subscription
constructor(
protected endpoint : string,
......@@ -113,11 +117,7 @@ export class PadchatRpc extends EventEmitter {
await this.init()
await this.WXInitialize()
await this.initHearteat()
this.logoutThrottleQueue.subscribe(msg => {
this.destroy(msg)
})
this.startQueues()
}
protected async initJsonRpc(): Promise<void> {
......@@ -220,10 +220,14 @@ export class PadchatRpc extends EventEmitter {
}
private initHearteat(): void {
private initHeartbeat(): void {
log.verbose('PadchatRpc', 'initHeartbeat()')
this.throttleQueue.subscribe(e => {
if (this.throttleSubscription || this.debounceSubscription) {
throw new Error('subscription exist when initHeartbeat')
}
this.throttleSubscription = this.throttleQueue.subscribe(e => {
/**
* This block will only be run once in a period,
* no matter than how many message the queue received.
......@@ -232,7 +236,7 @@ export class PadchatRpc extends EventEmitter {
this.emit('heartbeat', e)
})
this.debounceQueue.subscribe(e => {
this.debounceSubscription = this.debounceQueue.subscribe(e => {
/**
* This block will be run when:
* the queue did not receive any message after a period.
......@@ -264,6 +268,8 @@ export class PadchatRpc extends EventEmitter {
public stop(): void {
log.verbose('PadchatRpc', 'stop()')
this.stopQueues()
this.jsonRpc.removeAllListeners()
// TODO: use huan's version of JsonRpcPeer, to support end at here.
// this.jsonRpc.end()
......@@ -278,6 +284,40 @@ export class PadchatRpc extends EventEmitter {
}
}
private startQueues() {
log.verbose('PadchatRpc', 'startQueues()')
this.initHeartbeat()
if (this.logoutThrottleSubscription) {
throw new Error('this.logoutThrottleSubscription exist')
} else {
this.logoutThrottleSubscription = this.logoutThrottleQueue.subscribe(msg => {
this.destroy(msg)
})
}
}
private stopQueues() {
log.verbose('PadchatRpc', 'stopQueues()')
if ( this.throttleSubscription
&& this.debounceSubscription
&& this.logoutThrottleSubscription
) {
// Clean external subscriptions
this.debounceSubscription.unsubscribe()
this.logoutThrottleSubscription.unsubscribe()
this.throttleSubscription.unsubscribe()
// Clean internal subscriptions
this.debounceQueue.unsubscribe()
this.logoutThrottleQueue.unsubscribe()
this.throttleQueue.unsubscribe()
} else {
log.warn('PadchatRpc', 'stop() subscript not exist')
}
}
private async rpcCall(
apiName : string,
...params : string[]
......
......@@ -32,7 +32,6 @@
180
],
"callable-types": true,
"import-blacklist": [true, "rxjs"],
"interface-over-type-literal": true,
"no-empty-interface": true,
"no-string-throw": true,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册