提交 284ff345 编写于 作者: Huan (李卓桓)'s avatar Huan (李卓桓)

fix PadchatRpc restart hang for 20 seconds bug

上级 a1bb0613
......@@ -37,6 +37,7 @@ test('PadchatManager() cache should be release and can be re-init again.', async
for (let i = 0; i < 3; i++) {
await manager.initCache('fake-token', 'fake-self-id')
await manager.releaseCache()
t.pass('init/release-ed at #' + i)
}
t.pass('PadchatManager() cache init/release/init successed.')
} catch (e) {
......@@ -44,7 +45,7 @@ test('PadchatManager() cache should be release and can be re-init again.', async
}
})
test.only('PadchatManager() should can be restart() after a start()', async t => {
test('PadchatManager() should can be able to restart() many times for one instance', async t => {
const manager = new PadchatManager({
memory : new MemoryCard(),
token : 'mock token',
......@@ -54,8 +55,9 @@ test.only('PadchatManager() should can be restart() after a start()', async t =>
for (let i = 0; i < 3; i++) {
await manager.start()
await manager.stop()
t.pass('restarted at #' + i)
}
t.pass('PadchatManager() start/restart successed.')
t.pass('PadchatManager() restart successed.')
} catch (e) {
console.error(e)
t.fail(e)
......
......@@ -78,9 +78,9 @@ export class PadchatRpc extends EventEmitter {
private socket? : WebSocket
private readonly jsonRpc : any // Peer
private readonly throttleQueue : ThrottleQueue<string>
private readonly debounceQueue : DebounceQueue<string>
private readonly logoutThrottleQueue : ThrottleQueue<string>
private throttleQueue? : ThrottleQueue<string>
private debounceQueue? : DebounceQueue<string>
private logoutThrottleQueue? : ThrottleQueue<string>
private throttleSubscription? : Subscription
private debounceSubscription? : Subscription
......@@ -94,22 +94,6 @@ export class PadchatRpc extends EventEmitter {
log.verbose('PadchatRpc', 'constructor(%s, %s)', endpoint, token)
this.jsonRpc = new Peer()
/**
* Throttle for 10 seconds
*/
this.throttleQueue = new ThrottleQueue<string>(1000 * 10)
/**
* Debounce for 20 seconds
*/
this.debounceQueue = new DebounceQueue<string>(1000 * 10 * 2)
/**
* Throttle for 5 seconds for the `logout` event:
* we should only fire once for logout,
* but the server will send many events of 'logout'
*/
this.logoutThrottleQueue = new ThrottleQueue<string>(1000 * 5)
}
public async start(): Promise<void> {
......@@ -207,6 +191,12 @@ export class PadchatRpc extends EventEmitter {
*/
ws.on('close', e => {
log.warn('PadchatRpc', 'initWebSocket() ws.on(close) %s', e)
if (!this.logoutThrottleQueue) {
log.warn('PadchatRpc', 'initWebSocket() ws.on(close) logoutThrottleQueue not exist')
return
}
this.logoutThrottleQueue.next('ws.on(close, ' + e)
})
......@@ -214,10 +204,18 @@ export class PadchatRpc extends EventEmitter {
* use websocket message as heartbeat source
*/
ws.on('message', () => {
if (!this.throttleQueue || !this.debounceQueue) {
log.warn('PadchatRpc', 'initWebSocket() ws.on(message) throttleQueue or debounceQueue not exist')
return
}
this.throttleQueue.next('ws.on(message)')
this.debounceQueue.next('ws.on(message)')
})
ws.on('pong', data => {
if (!this.throttleQueue || !this.debounceQueue) {
log.warn('PadchatRpc', 'initWebSocket() ws.on(pong) throttleQueue or debounceQueue not exist')
return
}
this.throttleQueue.next(data.toString())
this.debounceQueue.next(data.toString())
})
......@@ -227,6 +225,11 @@ export class PadchatRpc extends EventEmitter {
private initHeartbeat(): void {
log.verbose('PadchatRpc', 'initHeartbeat()')
if (!this.throttleQueue || !this.debounceQueue) {
log.warn('PadchatRpc', 'initHeartbeat() throttleQueue or debounceQueue not exist')
return
}
if (this.throttleSubscription || this.debounceSubscription) {
throw new Error('subscription exist when initHeartbeat')
}
......@@ -291,6 +294,22 @@ export class PadchatRpc extends EventEmitter {
private startQueues() {
log.verbose('PadchatRpc', 'startQueues()')
/**
* Throttle for 10 seconds
*/
this.throttleQueue = new ThrottleQueue<string>(1000 * 10)
/**
* Debounce for 20 seconds
*/
this.debounceQueue = new DebounceQueue<string>(1000 * 10 * 2)
/**
* Throttle for 5 seconds for the `logout` event:
* we should only fire once for logout,
* but the server will send many events of 'logout'
*/
this.logoutThrottleQueue = new ThrottleQueue<string>(1000 * 5)
this.initHeartbeat()
if (this.logoutThrottleSubscription) {
......@@ -313,10 +332,27 @@ export class PadchatRpc extends EventEmitter {
this.debounceSubscription.unsubscribe()
this.logoutThrottleSubscription.unsubscribe()
this.throttleSubscription.unsubscribe()
// Clean internal subscriptions
this.debounceSubscription = undefined
this.logoutThrottleSubscription = undefined
this.throttleSubscription = undefined
}
if ( this.debounceQueue
&& this.logoutThrottleQueue
&& this.throttleQueue
) {
/**
* Queues clean internal subscriptions
*/
this.debounceQueue.unsubscribe()
this.logoutThrottleQueue.unsubscribe()
this.throttleQueue.unsubscribe()
this.debounceQueue = undefined
this.logoutThrottleQueue = undefined
this.throttleQueue = undefined
} else {
log.warn('PadchatRpc', 'stop() subscript not exist')
}
......@@ -345,7 +381,11 @@ export class PadchatRpc extends EventEmitter {
payload.type,
JSON.stringify(payload),
)
this.logoutThrottleQueue.next(payload.msg || 'onSocket(logout)')
if (this.logoutThrottleQueue) {
this.logoutThrottleQueue.next(payload.msg || 'onSocket(logout)')
} else {
log.warn('PadchatRpc', 'onSocket() logout logoutThrottleQueue not exist')
}
return
}
......
......@@ -513,7 +513,10 @@ export class PuppetPadchat extends Puppet {
public async contactAvatar(contactId: string, file: FileBox) : Promise<void>
public async contactAvatar(contactId: string, file?: FileBox): Promise<void | FileBox> {
log.verbose('PuppetPadchat', 'contactAvatar(%s, %s)', contactId, file ? file.name : '')
log.verbose('PuppetPadchat', 'contactAvatar(%s%s)',
contactId,
file ? (', ' + file.name) : '',
)
/**
* 1. set avatar for user self
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册