io.ts 12.3 KB
Newer Older
1
/**
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
2
 *   Wechaty - https://github.com/chatie/wechaty
3
 *
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
4
 *   @copyright 2016-2018 Huan LI <zixia@zixia.net>
5
 *
6 7 8
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
9
 *
10 11 12 13 14 15 16
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
17 18
 *
 */
19
import WebSocket from 'ws'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
20
import StateSwitch    from 'state-switch'
21

22
import {
23
  Message,
24
}                 from './message'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
25 26

import {
27
  PuppetScanEvent,
28
}                 from './puppet/'
29

30
import {
31
  config,
L
lijiarui 已提交
32
  log,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
33
}                 from './config'
34 35 36
import {
  Wechaty,
}                 from './wechaty'
37

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
38
export interface IoOptions {
L
lijiarui 已提交
39 40 41 42
  wechaty:    Wechaty,
  token:      string,
  apihost?:   string,
  protocol?:  string,
43 44
}

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
export const IO_EVENT_DICT = {
  botie: 'tbw',
  error: 'tbw',
  heartbeat: 'tbw',
  login: 'tbw',
  logout: 'tbw',
  message: 'tbw',
  update: 'tbw',
  raw: 'tbw',
  reset: 'tbw',
  scan: 'tbw',
  sys: 'tbw',
  shutdown: 'tbw',
}

type IoEventName = keyof typeof IO_EVENT_DICT
61

62
interface IoEvent {
L
lijiarui 已提交
63 64
  name:     IoEventName,
  payload:  any,
65
}
66

67
export class Io {
68 69
  private readonly cuid     : string
  private readonly protocol : string
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
70
  private eventBuffer : IoEvent[] = []
71
  private ws          : undefined | WebSocket
72

73
  private readonly state = new StateSwitch('Io', log)
74

75 76
  private reconnectTimer?   : NodeJS.Timer
  private reconnectTimeout? : number
77

78
  private onMessage: undefined | Function
79

80
  private scanPayload?: PuppetScanEvent
81

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
82 83 84 85
  constructor(
    private options: IoOptions,
  ) {
    options.apihost   = options.apihost   || config.apihost
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
86
    options.protocol  = options.protocol  || config.default.DEFAULT_PROTOCOL
87

88
    this.cuid = options.wechaty.id
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
89

90
    this.protocol = options.protocol + '|' + options.wechaty.id
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
91
    log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], cuid[%s]',
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
92 93 94
                      options.apihost,
                      options.token,
                      options.protocol,
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
95
                      this.cuid,
96
              )
97 98
  }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
99 100 101
  public toString() {
    return `Io<${this.options.token}>`
  }
102

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
103 104 105
  private connected() {
    return this.ws && this.ws.readyState === WebSocket.OPEN
  }
106

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
107
  public async init(): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
108
    log.verbose('Io', 'init()')
109

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
110
    this.state.on('pending')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
111

112 113
    try {
      await this.initEventHook()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
114
      this.ws = this.initWebSocket()
115
      this.options.wechaty.on('scan', (qrcode, status) => {
116
        this.scanPayload = Object.assign(this.scanPayload || {}, {
117 118
          qrcode,
          status,
119
        })
120
      })
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
121
      this.state.on(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
122

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
123
      return
124
    } catch (e) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
125
      log.warn('Io', 'init() exception: %s', e.message)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
126
      this.state.off(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
127
      throw e
128
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
129 130
  }

131
  private initEventHook() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
132
    log.verbose('Io', 'initEventHook()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
133
    const wechaty = this.options.wechaty
134

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
135
    wechaty.on('error'    , error =>        this.send({ name: 'error',      payload: error }))
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
136
    wechaty.on('heartbeat', data  =>        this.send({ name: 'heartbeat',  payload: { cuid: this.cuid, data } }))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
137 138
    wechaty.on('login',     user =>         this.send({ name: 'login',      payload: user }))
    wechaty.on('logout' ,   user =>         this.send({ name: 'logout',     payload: user }))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
139
    wechaty.on('message',   message =>      this.ioMessage(message))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
140
    wechaty.on('scan',      (url, code) =>  this.send({ name: 'scan',       payload: { url, code } }))
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170

    // const hookEvents: WechatyEventName[] = [
    //   'scan'
    //   , 'login'
    //   , 'logout'
    //   , 'heartbeat'
    //   , 'error'
    // ]
    // hookEvents.map(event => {
    //   wechaty.on(event, (data) => {
    //     const ioEvent: IoEvent = {
    //       name:       event
    //       , payload:  data
    //     }

    //     switch (event) {
    //       case 'login':
    //       case 'logout':
    //         if (data instanceof Contact) {
    //           // ioEvent.payload = data.obj
    //           ioEvent.payload = data
    //         }
    //         break

    //       case 'error':
    //         ioEvent.payload = data.toString()
    //         break

        //   case 'heartbeat':
        //     ioEvent.payload = {
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
171
        //       cuid: this.cuid
172 173 174 175 176 177 178 179 180 181 182
        //       , data: data
        //     }
        //     break

        //   default:
        //     break
        // }

    //     this.send(ioEvent)
    //   })
    // })
183

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
184 185 186 187
    // wechaty.on('message', m => {
    //   const text = (m.room() ? '[' + m.room().topic() + ']' : '')
    //               + '<' + m.from().name() + '>'
    //               + ':' + m.toStringDigest()
188

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
189 190
    //   this.send({ name: 'message', payload:  text })
    // })
191

192
    return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
193 194
  }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
195 196
  private initWebSocket() {
    log.verbose('Io', 'initWebSocket()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
197
    // this.state.current('on', false)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
198 199 200 201 202 203 204 205 206 207 208

    // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
    const auth = 'Token ' + this.options.token
    const headers = { 'Authorization': auth }

    if (!this.options.apihost) {
      throw new Error('no apihost')
    }
    let endpoint = 'wss://' + this.options.apihost + '/v0/websocket'

    // XXX quick and dirty: use no ssl for APIHOST other than official
209 210
    // FIXME: use a configuarable VARIABLE for the domain name at here:
    if (!/api\.chatie\.io/.test(this.options.apihost)) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
211 212 213 214 215 216 217
      endpoint = 'ws://' + this.options.apihost + '/v0/websocket'
    }

    const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })

    ws.on('open',     () => this.wsOnOpen(ws))
    ws.on('message',  data => this.wsOnMessage(data))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
218
    ws.on('error',    e => this.wsOnError(e))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
219
    ws.on('close',    (code, reason) => this.wsOnClose(ws, code, reason))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
220 221 222 223

    return ws
  }

ruiruibupt's avatar
ruiruibupt 已提交
224
  private async wsOnOpen(ws: WebSocket): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
225 226 227 228 229 230
    if (this.protocol !== ws.protocol) {
      log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
      // XXX deal with error?
    }
    log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
    // this.currentState('connected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
231
    // this.state.current('on')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
232 233 234 235

    // FIXME: how to keep alive???
    // ws._socket.setKeepAlive(true, 100)

236
    this.reconnectTimeout = undefined
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
237 238

    const name    = 'sys'
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
239
    const payload = 'Wechaty version ' + this.options.wechaty.version() + ` with CUID: ${this.cuid}`
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
240 241 242 243 244

    const initEvent: IoEvent = {
      name,
      payload,
    }
ruiruibupt's avatar
ruiruibupt 已提交
245
    await this.send(initEvent)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
246 247
  }

ruiruibupt's avatar
ruiruibupt 已提交
248
  private async wsOnMessage(data: WebSocket.Data) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
    log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
    // flags.binary will be set if a binary data is received.
    // flags.masked will be set if the data was masked.

    if (typeof data !== 'string') {
      throw new Error('data should be string...')
    }

    const ioEvent: IoEvent = {
      name    : 'raw',
      payload : data,
    }

    try {
      const obj = JSON.parse(data)
      ioEvent.name    = obj.name
      ioEvent.payload = obj.payload
    } catch (e) {
      log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
    }

    switch (ioEvent.name) {
      case 'botie':
        const payload = ioEvent.payload
        if (payload.onMessage) {
          const script = payload.script
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
275 276 277 278 279 280 281 282 283 284 285
          try {
            /* tslint:disable:no-eval */
            const fn = eval(script)
            if (typeof fn === 'function') {
              this.onMessage = fn
            } else {
              log.warn('Io', 'server pushed function is invalid')
            }
          } catch (e) {
            log.warn('Io', 'server pushed function exception: %s', e)
            this.options.wechaty.emit('error', e)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
286 287 288 289 290 291
          }
        }
        break

      case 'reset':
        log.verbose('Io', 'on(reset): %s', ioEvent.payload)
ruiruibupt's avatar
ruiruibupt 已提交
292
        await this.options.wechaty.reset(ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
293 294 295
        break

      case 'shutdown':
296
        log.info('Io', 'on(shutdown): %s', ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
297 298 299 300
        process.exit(0)
        break

      case 'update':
301 302
        log.verbose('Io', 'on(update): %s', ioEvent.payload)

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
303 304 305 306 307 308 309 310 311 312 313 314
        try {
          const userId = this.options.wechaty.puppet.selfId()

          if (userId) {
            const loginEvent: IoEvent = {
              name    : 'login',
              payload : {
                id: userId,
                name: this.options.wechaty.Contact.load(userId).name(),
              },
            }
            await this.send(loginEvent)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
315
          }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
316 317
        } catch (e) {
          // not login
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
318 319
        }

320
        if (this.scanPayload) {
321
          const scanEvent: IoEvent = {
322
            name:     'scan',
323
            payload:  this.scanPayload,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
324
          }
ruiruibupt's avatar
ruiruibupt 已提交
325
          await this.send(scanEvent)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
326 327 328 329 330 331 332 333
        }

        break

      case 'sys':
        // do nothing
        break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
334
      case 'logout':
335
        log.info('Io', 'on(logout): %s', ioEvent.payload)
ruiruibupt's avatar
ruiruibupt 已提交
336
        await this.options.wechaty.logout()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
337 338
        break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
339 340 341 342 343 344
      default:
        log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
        break
    }
  }

345 346 347 348
  // FIXME: it seems the parameter `e` might be `undefined`.
  // @types/ws might has bug for `ws.on('error',    e => this.wsOnError(e))`
  private wsOnError(e?: Error) {
    log.warn('Io', 'initWebSocket() error event[%s]', e && e.message)
349 350 351
    if (!e) {
      return
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
    this.options.wechaty.emit('error', e)

    // when `error`, there must have already a `close` event
    // we should not call this.reconnect() again
    //
    // this.close()
    // this.reconnect()
  }

  private wsOnClose(
    ws      : WebSocket,
    code    : number,
    message : string,
  ): void {
    log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
    ws.close()
    this.reconnect()
  }

  private reconnect() {
    log.verbose('Io', 'reconnect()')

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
374
    if (this.state.off()) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
      log.warn('Io', 'reconnect() canceled because state.target() === offline')
      return
    }

    if (this.connected()) {
      log.warn('Io', 'reconnect() on a already connected io')
      return
    }
    if (this.reconnectTimer) {
      log.warn('Io', 'reconnect() on a already re-connecting io')
      return
    }

    if (!this.reconnectTimeout) {
      this.reconnectTimeout = 1
    } else if (this.reconnectTimeout < 10 * 1000) {
      this.reconnectTimeout *= 3
    }

    log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
    this.reconnectTimer = setTimeout(_ => {
396
      this.reconnectTimer = undefined
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
397 398 399 400
      this.initWebSocket()
    }, this.reconnectTimeout)// as any as NodeJS.Timer
  }

401
  private async send(ioEvent?: IoEvent): Promise<void> {
402 403 404 405 406 407
    if (!this.ws) {
      throw new Error('no ws')
    }

    const ws = this.ws

408 409
    if (ioEvent) {
      log.silly('Io', 'send(%s: %s)', ioEvent.name, ioEvent.payload)
410
      this.eventBuffer.push(ioEvent)
411
    } else { log.silly('Io', 'send()') }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
412 413

    if (!this.connected()) {
414
      log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
415
      return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
416 417
    }

418
    const list: Promise<any>[] = []
419
    while (this.eventBuffer.length) {
420
      const p = new Promise((resolve, reject) => ws.send(
421
        JSON.stringify(
L
lijiarui 已提交
422 423 424
          this.eventBuffer.shift(),
        ),
        (err: Error) => {
425 426
          if (err)  { reject(err) }
          else      { resolve()   }
L
lijiarui 已提交
427
        },
428 429 430 431 432 433 434 435 436
      ))
      list.push(p)
    }

    try {
      await Promise.all(list)
    } catch (e) {
      log.error('Io', 'send() exceptio: %s', e.stack)
      throw e
437
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
438
  }
439

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
440
  public async quit(): Promise<void> {
441 442 443 444
    if (!this.ws) {
      throw new Error('no ws')
    }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
445
    this.state.off('pending')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
446

447
    // try to send IoEvents in buffer
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
448
    await this.send()
449
    this.eventBuffer = []
450

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
451 452
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
453
      this.reconnectTimer = undefined
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
454
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
455 456

    this.ws.close()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
457

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
458
    this.state.off(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
459

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
460
    return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
461
  }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
462 463 464 465 466
  /**
   *
   * Prepare to be overwriten by server setting
   *
   */
467
  private async ioMessage(m: Message): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
468
    log.silly('Io', 'ioMessage() is a nop function before be overwriten from cloud')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
469 470 471
    if (typeof this.onMessage === 'function') {
      await this.onMessage(m)
    }
472
  }
473

474
}
475 476

export default Io