io.ts 12.3 KB
Newer Older
1
/**
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
2
 *   Wechaty - https://github.com/chatie/wechaty
3
 *
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
4
 *   @copyright 2016-2018 Huan LI <zixia@zixia.net>
5
 *
6 7 8
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
9
 *
10 11 12 13 14 15 16
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
17 18
 *
 */
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
19
import { StateSwitch }  from 'state-switch'
20
import WebSocket        from 'ws'
21

22
import {
23
  Message,
24
}                 from './user'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
25 26

import {
27
  PuppetQrcodeScanEvent,
28
}                 from 'wechaty-puppet'
29

30
import {
31
  config,
L
lijiarui 已提交
32
  log,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
33
}                 from './config'
34 35 36
import {
  AnyFunction,
}                 from './types'
37 38 39
import {
  Wechaty,
}                 from './wechaty'
40

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
41
export interface IoOptions {
L
lijiarui 已提交
42 43 44 45
  wechaty:    Wechaty,
  token:      string,
  apihost?:   string,
  protocol?:  string,
46 47
}

48
export const IO_EVENT_DICT = {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
49 50 51 52 53 54 55 56 57 58
  botie     : 'tbw',
  error     : 'tbw',
  heartbeat : 'tbw',
  login     : 'tbw',
  logout    : 'tbw',
  message   : 'tbw',
  raw       : 'tbw',
  reset     : 'tbw',
  scan      : 'tbw',
  shutdown  : 'tbw',
59 60
  sys       : 'tbw',
  update    : 'tbw',
61 62 63
}

type IoEventName = keyof typeof IO_EVENT_DICT
64

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
65
interface IoEventScan {
66
  name    : 'scan',
67
  payload : PuppetQrcodeScanEvent,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
68 69 70
}

interface IoEventAny {
L
lijiarui 已提交
71 72
  name:     IoEventName,
  payload:  any,
73
}
74

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
75 76
type IoEvent = IoEventScan | IoEventAny

77
export class Io {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
78
  private readonly id       : string
79
  private readonly protocol : string
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
80 81
  private eventBuffer       : IoEvent[] = []
  private ws                : undefined | WebSocket
82

83
  private readonly state = new StateSwitch('Io', log)
84

85 86
  private reconnectTimer?   : NodeJS.Timer
  private reconnectTimeout? : number
87

88 89
  private lifeTimer? : NodeJS.Timer

90
  private onMessage: undefined | AnyFunction
91

92
  private scanPayload?: PuppetQrcodeScanEvent
93

94
  constructor (
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
95 96 97
    private options: IoOptions,
  ) {
    options.apihost   = options.apihost   || config.apihost
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
98
    options.protocol  = options.protocol  || config.default.DEFAULT_PROTOCOL
99

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
100
    this.id = options.wechaty.id
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
101

102
    this.protocol = options.protocol + '|' + options.wechaty.id
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
103
    log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], cuid[%s]',
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
104 105 106
                      options.apihost,
                      options.token,
                      options.protocol,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
107
                      this.id,
108
              )
109 110
  }

111
  public toString () {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
112 113
    return `Io<${this.options.token}>`
  }
114

115
  private connected () {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
116 117
    return this.ws && this.ws.readyState === WebSocket.OPEN
  }
118

119
  public async start (): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
120
    log.verbose('Io', 'start()')
121

122 123 124 125
    if (this.lifeTimer) {
      throw new Error('lifeTimer exist')
    }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
126
    this.state.on('pending')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
127

128
    try {
129
      this.initEventHook()
130 131 132

      this.ws = await this.initWebSocket()

133
      this.options.wechaty.on('scan', (qrcode, status) => {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
134 135
        this.scanPayload = {
          ...this.scanPayload,
136 137
          qrcode,
          status,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
138
        }
139
      })
140 141 142 143

      this.lifeTimer = setInterval(() => {
        if (this.ws && this.connected()) {
          log.silly('Io', 'start() setInterval() ws.ping()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
144
          // TODO: check 'pong' event on ws
145 146 147 148
          this.ws.ping()
        }
      }, 1000 * 10)

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
149
      this.state.on(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
150

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
151
      return
152
    } catch (e) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
153
      log.warn('Io', 'start() exception: %s', e.message)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
154
      this.state.off(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
155
      throw e
156
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
157 158
  }

159
  private initEventHook () {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
160
    log.verbose('Io', 'initEventHook()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
161
    const wechaty = this.options.wechaty
162

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
163
    wechaty.on('error'    , error =>        this.send({ name: 'error',      payload: error }))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
164
    wechaty.on('heartbeat', data  =>        this.send({ name: 'heartbeat',  payload: { cuid: this.id, data } }))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
165 166
    wechaty.on('login',     user =>         this.send({ name: 'login',      payload: user }))
    wechaty.on('logout' ,   user =>         this.send({ name: 'logout',     payload: user }))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
167
    wechaty.on('message',   message =>      this.ioMessage(message))
168 169 170 171

    // FIXME: payload schema need to be defined universal
    // wechaty.on('scan',      (url, code) =>  this.send({ name: 'scan',       payload: { url, code } }))
    wechaty.on('scan',      (qrcode, status) =>  this.send({ name: 'scan',  payload: { qrcode, status } } as IoEventScan ))
172 173

    return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
174 175
  }

176
  private async initWebSocket (): Promise<WebSocket> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
177
    log.verbose('Io', 'initWebSocket()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
178
    // this.state.current('on', false)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
179 180 181

    // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
    const auth = 'Token ' + this.options.token
182
    const headers = { Authorization: auth }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
183 184 185 186 187 188 189

    if (!this.options.apihost) {
      throw new Error('no apihost')
    }
    let endpoint = 'wss://' + this.options.apihost + '/v0/websocket'

    // XXX quick and dirty: use no ssl for APIHOST other than official
190 191
    // FIXME: use a configuarable VARIABLE for the domain name at here:
    if (!/api\.chatie\.io/.test(this.options.apihost)) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
192 193 194 195 196 197 198
      endpoint = 'ws://' + this.options.apihost + '/v0/websocket'
    }

    const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })

    ws.on('open',     () => this.wsOnOpen(ws))
    ws.on('message',  data => this.wsOnMessage(data))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
199
    ws.on('error',    e => this.wsOnError(e))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
200
    ws.on('close',    (code, reason) => this.wsOnClose(ws, code, reason))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
201

202 203 204 205 206 207
    await new Promise((resolve, reject) => {
      ws.once('open', resolve)
      ws.once('error', reject)
      ws.once('close', reject)
    })

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
208 209 210
    return ws
  }

211
  private async wsOnOpen (ws: WebSocket): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
212 213 214 215 216 217
    if (this.protocol !== ws.protocol) {
      log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
      // XXX deal with error?
    }
    log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
    // this.currentState('connected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
218
    // this.state.current('on')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
219 220 221 222

    // FIXME: how to keep alive???
    // ws._socket.setKeepAlive(true, 100)

223
    this.reconnectTimeout = undefined
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
224 225

    const name    = 'sys'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
226
    const payload = 'Wechaty version ' + this.options.wechaty.version() + ` with CUID: ${this.id}`
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
227 228 229 230 231

    const initEvent: IoEvent = {
      name,
      payload,
    }
ruiruibupt's avatar
ruiruibupt 已提交
232
    await this.send(initEvent)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
233 234
  }

235
  private async wsOnMessage (data: WebSocket.Data) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
    log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
    // flags.binary will be set if a binary data is received.
    // flags.masked will be set if the data was masked.

    if (typeof data !== 'string') {
      throw new Error('data should be string...')
    }

    const ioEvent: IoEvent = {
      name    : 'raw',
      payload : data,
    }

    try {
      const obj = JSON.parse(data)
      ioEvent.name    = obj.name
      ioEvent.payload = obj.payload
    } catch (e) {
      log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
    }

    switch (ioEvent.name) {
      case 'botie':
        const payload = ioEvent.payload
        if (payload.onMessage) {
          const script = payload.script
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
262 263 264 265 266 267 268 269 270 271 272
          try {
            /* tslint:disable:no-eval */
            const fn = eval(script)
            if (typeof fn === 'function') {
              this.onMessage = fn
            } else {
              log.warn('Io', 'server pushed function is invalid')
            }
          } catch (e) {
            log.warn('Io', 'server pushed function exception: %s', e)
            this.options.wechaty.emit('error', e)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
273 274 275 276 277 278
          }
        }
        break

      case 'reset':
        log.verbose('Io', 'on(reset): %s', ioEvent.payload)
ruiruibupt's avatar
ruiruibupt 已提交
279
        await this.options.wechaty.reset(ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
280 281 282
        break

      case 'shutdown':
283
        log.info('Io', 'on(shutdown): %s', ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
284 285 286 287
        process.exit(0)
        break

      case 'update':
288 289
        log.verbose('Io', 'on(update): %s', ioEvent.payload)

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
290 291 292 293 294 295 296 297
        const wechaty = this.options.wechaty
        if (wechaty.logonoff()) {
          const loginEvent: IoEvent = {
            name    : 'login',
            payload : {
              id   : wechaty.userSelf().id,
              name : wechaty.userSelf().name(),
            },
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
298
          }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
299
          await this.send(loginEvent)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
300 301
        }

302
        if (this.scanPayload) {
303
          const scanEvent: IoEventScan = {
304
            name:     'scan',
305
            payload:  this.scanPayload,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
306
          }
ruiruibupt's avatar
ruiruibupt 已提交
307
          await this.send(scanEvent)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
308 309 310 311 312 313 314 315
        }

        break

      case 'sys':
        // do nothing
        break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
316
      case 'logout':
317
        log.info('Io', 'on(logout): %s', ioEvent.payload)
ruiruibupt's avatar
ruiruibupt 已提交
318
        await this.options.wechaty.logout()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
319 320
        break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
321 322 323 324 325 326
      default:
        log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
        break
    }
  }

327 328
  // FIXME: it seems the parameter `e` might be `undefined`.
  // @types/ws might has bug for `ws.on('error',    e => this.wsOnError(e))`
329
  private wsOnError (e?: Error) {
330
    log.warn('Io', 'initWebSocket() error event[%s]', e && e.message)
331 332 333
    if (!e) {
      return
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
334 335 336 337 338 339 340 341 342
    this.options.wechaty.emit('error', e)

    // when `error`, there must have already a `close` event
    // we should not call this.reconnect() again
    //
    // this.close()
    // this.reconnect()
  }

343
  private wsOnClose (
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
344 345 346 347
    ws      : WebSocket,
    code    : number,
    message : string,
  ): void {
348 349 350 351 352
    if (this.state.on()) {
      log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
      ws.close()
      this.reconnect()
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
353 354
  }

355
  private reconnect () {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
356 357
    log.verbose('Io', 'reconnect()')

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
358
    if (this.state.off()) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
      log.warn('Io', 'reconnect() canceled because state.target() === offline')
      return
    }

    if (this.connected()) {
      log.warn('Io', 'reconnect() on a already connected io')
      return
    }
    if (this.reconnectTimer) {
      log.warn('Io', 'reconnect() on a already re-connecting io')
      return
    }

    if (!this.reconnectTimeout) {
      this.reconnectTimeout = 1
    } else if (this.reconnectTimeout < 10 * 1000) {
      this.reconnectTimeout *= 3
    }

    log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
379
    this.reconnectTimer = setTimeout(async _ => {
380
      this.reconnectTimer = undefined
381
      await this.initWebSocket()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
382 383 384
    }, this.reconnectTimeout)// as any as NodeJS.Timer
  }

385
  private async send (ioEvent?: IoEvent): Promise<void> {
386 387 388 389 390 391
    if (!this.ws) {
      throw new Error('no ws')
    }

    const ws = this.ws

392
    if (ioEvent) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
393
      log.silly('Io', 'send(%s)', JSON.stringify(ioEvent))
394
      this.eventBuffer.push(ioEvent)
395
    } else { log.silly('Io', 'send()') }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
396 397

    if (!this.connected()) {
398
      log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
399
      return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
400 401
    }

402
    const list: Array<Promise<any>> = []
403
    while (this.eventBuffer.length) {
404
      const p = new Promise((resolve, reject) => ws.send(
405
        JSON.stringify(
L
lijiarui 已提交
406 407 408
          this.eventBuffer.shift(),
        ),
        (err: Error) => {
409 410 411 412 413
          if (err)  {
            reject(err)
          } else {
            resolve()
          }
L
lijiarui 已提交
414
        },
415 416 417 418 419 420 421 422 423
      ))
      list.push(p)
    }

    try {
      await Promise.all(list)
    } catch (e) {
      log.error('Io', 'send() exceptio: %s', e.stack)
      throw e
424
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
425
  }
426

427
  public async stop (): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
428 429
    log.verbose('Io', 'stop()')

430 431 432 433
    if (!this.ws) {
      throw new Error('no ws')
    }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
434
    this.state.off('pending')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
435

436
    // try to send IoEvents in buffer
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
437
    await this.send()
438
    this.eventBuffer = []
439

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
440 441
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
442
      this.reconnectTimer = undefined
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
443
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
444

445 446 447 448 449
    if (this.lifeTimer) {
      clearInterval(this.lifeTimer)
      this.lifeTimer = undefined
    }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
450
    this.ws.close()
451 452 453 454 455 456 457 458
    await new Promise(r => {
      if (this.ws) {
        this.ws.once('close', r)
      } else {
        r()
      }
    })
    this.ws = undefined
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
459

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
460
    this.state.off(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
461

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
462
    return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
463
  }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
464 465 466 467 468
  /**
   *
   * Prepare to be overwriten by server setting
   *
   */
469
  private async ioMessage (m: Message): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
470
    log.silly('Io', 'ioMessage() is a nop function before be overwriten from cloud')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
471 472 473
    if (typeof this.onMessage === 'function') {
      await this.onMessage(m)
    }
474
  }
475

476
}