io.ts 12.0 KB
Newer Older
1
/**
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
2
 *   Wechaty - https://github.com/chatie/wechaty
3
 *
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
4
 *   @copyright 2016-2018 Huan LI <zixia@zixia.net>
5
 *
6 7 8
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
9
 *
10 11 12 13 14 15 16
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
17 18
 *
 */
19
import * as WebSocket from 'ws'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
20
import StateSwitch    from 'state-switch'
21

22
import {
23
  Message,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
24 25 26
}             from './message'

import {
27
  ScanData,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
28
}             from './puppet/'
29

30
import {
31
  config,
L
lijiarui 已提交
32
  log,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
33
}                 from './config'
34 35 36
import {
  Wechaty,
}                 from './wechaty'
37

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
38
export interface IoOptions {
L
lijiarui 已提交
39 40 41 42
  wechaty:    Wechaty,
  token:      string,
  apihost?:   string,
  protocol?:  string,
43 44
}

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
export const IO_EVENT_DICT = {
  botie: 'tbw',
  error: 'tbw',
  heartbeat: 'tbw',
  login: 'tbw',
  logout: 'tbw',
  message: 'tbw',
  update: 'tbw',
  raw: 'tbw',
  reset: 'tbw',
  scan: 'tbw',
  sys: 'tbw',
  shutdown: 'tbw',
}

type IoEventName = keyof typeof IO_EVENT_DICT
61

62
interface IoEvent {
L
lijiarui 已提交
63 64
  name:     IoEventName,
  payload:  any,
65
}
66

67
export class Io {
68 69
  private readonly cuid     : string
  private readonly protocol : string
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
70
  private eventBuffer : IoEvent[] = []
71
  private ws          : undefined | WebSocket
72

73
  private readonly state = new StateSwitch('Io', log)
74

75 76
  private reconnectTimer?   : NodeJS.Timer
  private reconnectTimeout? : number
77

78
  private onMessage: undefined | Function
79

80 81
  private scanData: ScanData

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
82 83 84 85
  constructor(
    private options: IoOptions,
  ) {
    options.apihost   = options.apihost   || config.apihost
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
86
    options.protocol  = options.protocol  || config.default.DEFAULT_PROTOCOL
87

Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
88
    this.cuid = options.wechaty.cuid
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
89

90 91
    this.scanData = {} as any

Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
92 93
    this.protocol = options.protocol + '|' + options.wechaty.cuid
    log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], cuid[%s]',
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
94 95 96
                      options.apihost,
                      options.token,
                      options.protocol,
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
97
                      this.cuid,
98
              )
99 100
  }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
101 102 103
  public toString() {
    return `Io<${this.options.token}>`
  }
104

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
105 106 107
  private connected() {
    return this.ws && this.ws.readyState === WebSocket.OPEN
  }
108

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
109
  public async init(): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
110
    log.verbose('Io', 'init()')
111

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
112
    this.state.on('pending')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
113

114 115
    try {
      await this.initEventHook()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
116
      this.ws = this.initWebSocket()
117 118 119 120
      this.options.wechaty.on('scan', (url, code) => {
        this.scanData.url   = url
        this.scanData.code  = code
      })
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
121
      this.state.on(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
122

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
123
      return
124
    } catch (e) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
125
      log.warn('Io', 'init() exception: %s', e.message)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
126
      this.state.off(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
127
      throw e
128
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
129 130
  }

131
  private initEventHook() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
132
    log.verbose('Io', 'initEventHook()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
133
    const wechaty = this.options.wechaty
134

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
135
    wechaty.on('error'    , error =>        this.send({ name: 'error',      payload: error }))
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
136
    wechaty.on('heartbeat', data  =>        this.send({ name: 'heartbeat',  payload: { cuid: this.cuid, data } }))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
137 138
    wechaty.on('login',     user =>         this.send({ name: 'login',      payload: user }))
    wechaty.on('logout' ,   user =>         this.send({ name: 'logout',     payload: user }))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
139
    wechaty.on('message',   message =>      this.ioMessage(message))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
140
    wechaty.on('scan',      (url, code) =>  this.send({ name: 'scan',       payload: { url, code } }))
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170

    // const hookEvents: WechatyEventName[] = [
    //   'scan'
    //   , 'login'
    //   , 'logout'
    //   , 'heartbeat'
    //   , 'error'
    // ]
    // hookEvents.map(event => {
    //   wechaty.on(event, (data) => {
    //     const ioEvent: IoEvent = {
    //       name:       event
    //       , payload:  data
    //     }

    //     switch (event) {
    //       case 'login':
    //       case 'logout':
    //         if (data instanceof Contact) {
    //           // ioEvent.payload = data.obj
    //           ioEvent.payload = data
    //         }
    //         break

    //       case 'error':
    //         ioEvent.payload = data.toString()
    //         break

        //   case 'heartbeat':
        //     ioEvent.payload = {
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
171
        //       cuid: this.cuid
172 173 174 175 176 177 178 179 180 181 182
        //       , data: data
        //     }
        //     break

        //   default:
        //     break
        // }

    //     this.send(ioEvent)
    //   })
    // })
183

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
184 185 186 187
    // wechaty.on('message', m => {
    //   const text = (m.room() ? '[' + m.room().topic() + ']' : '')
    //               + '<' + m.from().name() + '>'
    //               + ':' + m.toStringDigest()
188

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
189 190
    //   this.send({ name: 'message', payload:  text })
    // })
191

192
    return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
193 194
  }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
195 196
  private initWebSocket() {
    log.verbose('Io', 'initWebSocket()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
197
    // this.state.current('on', false)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
198 199 200 201 202 203 204 205 206 207 208

    // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
    const auth = 'Token ' + this.options.token
    const headers = { 'Authorization': auth }

    if (!this.options.apihost) {
      throw new Error('no apihost')
    }
    let endpoint = 'wss://' + this.options.apihost + '/v0/websocket'

    // XXX quick and dirty: use no ssl for APIHOST other than official
209 210
    // FIXME: use a configuarable VARIABLE for the domain name at here:
    if (!/api\.chatie\.io/.test(this.options.apihost)) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
211 212 213 214 215 216 217
      endpoint = 'ws://' + this.options.apihost + '/v0/websocket'
    }

    const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })

    ws.on('open',     () => this.wsOnOpen(ws))
    ws.on('message',  data => this.wsOnMessage(data))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
218
    ws.on('error',    e => this.wsOnError(e))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
219
    ws.on('close',    (code, reason) => this.wsOnClose(ws, code, reason))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
220 221 222 223 224 225 226 227 228 229 230

    return ws
  }

  private wsOnOpen(ws: WebSocket): void {
    if (this.protocol !== ws.protocol) {
      log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
      // XXX deal with error?
    }
    log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
    // this.currentState('connected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
231
    // this.state.current('on')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
232 233 234 235

    // FIXME: how to keep alive???
    // ws._socket.setKeepAlive(true, 100)

236
    this.reconnectTimeout = undefined
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
237 238

    const name    = 'sys'
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
239
    const payload = 'Wechaty version ' + this.options.wechaty.version() + ` with CUID: ${this.cuid}`
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274

    const initEvent: IoEvent = {
      name,
      payload,
    }
    this.send(initEvent)
  }

  private wsOnMessage(data: WebSocket.Data) {
    log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
    // flags.binary will be set if a binary data is received.
    // flags.masked will be set if the data was masked.

    if (typeof data !== 'string') {
      throw new Error('data should be string...')
    }

    const ioEvent: IoEvent = {
      name    : 'raw',
      payload : data,
    }

    try {
      const obj = JSON.parse(data)
      ioEvent.name    = obj.name
      ioEvent.payload = obj.payload
    } catch (e) {
      log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
    }

    switch (ioEvent.name) {
      case 'botie':
        const payload = ioEvent.payload
        if (payload.onMessage) {
          const script = payload.script
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
275 276 277 278 279 280 281 282 283 284 285
          try {
            /* tslint:disable:no-eval */
            const fn = eval(script)
            if (typeof fn === 'function') {
              this.onMessage = fn
            } else {
              log.warn('Io', 'server pushed function is invalid')
            }
          } catch (e) {
            log.warn('Io', 'server pushed function exception: %s', e)
            this.options.wechaty.emit('error', e)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
286 287 288 289 290 291 292 293 294 295
          }
        }
        break

      case 'reset':
        log.verbose('Io', 'on(reset): %s', ioEvent.payload)
        this.options.wechaty.reset(ioEvent.payload)
        break

      case 'shutdown':
296
        log.info('Io', 'on(shutdown): %s', ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
297 298 299 300
        process.exit(0)
        break

      case 'update':
301 302 303
        log.verbose('Io', 'on(update): %s', ioEvent.payload)

        const user = this.options.wechaty.puppet.userSelf()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
304 305 306 307

        if (user) {
          const loginEvent: IoEvent = {
            name    : 'login',
308 309 310 311
            payload : {
              id: user.id,
              name: user.name(),
            },
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
312 313 314 315
          }
          this.send(loginEvent)
        }

316 317
        if (this.scanData) {
          const scanEvent: IoEvent = {
318 319
            name:     'scan',
            payload:  this.scanData,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
320
          }
321
          this.send(scanEvent)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
322 323 324 325 326 327 328 329
        }

        break

      case 'sys':
        // do nothing
        break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
330
      case 'logout':
331
        log.info('Io', 'on(logout): %s', ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
332 333 334
        this.options.wechaty.logout()
        break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
335 336 337 338 339 340
      default:
        log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
        break
    }
  }

341 342 343 344
  // FIXME: it seems the parameter `e` might be `undefined`.
  // @types/ws might has bug for `ws.on('error',    e => this.wsOnError(e))`
  private wsOnError(e?: Error) {
    log.warn('Io', 'initWebSocket() error event[%s]', e && e.message)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
    this.options.wechaty.emit('error', e)

    // when `error`, there must have already a `close` event
    // we should not call this.reconnect() again
    //
    // this.close()
    // this.reconnect()
  }

  private wsOnClose(
    ws      : WebSocket,
    code    : number,
    message : string,
  ): void {
    log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
    ws.close()
    this.reconnect()
  }

  private reconnect() {
    log.verbose('Io', 'reconnect()')

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
367
    if (this.state.off()) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
      log.warn('Io', 'reconnect() canceled because state.target() === offline')
      return
    }

    if (this.connected()) {
      log.warn('Io', 'reconnect() on a already connected io')
      return
    }
    if (this.reconnectTimer) {
      log.warn('Io', 'reconnect() on a already re-connecting io')
      return
    }

    if (!this.reconnectTimeout) {
      this.reconnectTimeout = 1
    } else if (this.reconnectTimeout < 10 * 1000) {
      this.reconnectTimeout *= 3
    }

    log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
    this.reconnectTimer = setTimeout(_ => {
389
      this.reconnectTimer = undefined
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
390 391 392 393
      this.initWebSocket()
    }, this.reconnectTimeout)// as any as NodeJS.Timer
  }

394
  private async send(ioEvent?: IoEvent): Promise<void> {
395 396 397 398 399 400
    if (!this.ws) {
      throw new Error('no ws')
    }

    const ws = this.ws

401 402
    if (ioEvent) {
      log.silly('Io', 'send(%s: %s)', ioEvent.name, ioEvent.payload)
403
      this.eventBuffer.push(ioEvent)
404
    } else { log.silly('Io', 'send()') }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
405 406

    if (!this.connected()) {
407
      log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
408
      return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
409 410
    }

411
    const list: Promise<any>[] = []
412
    while (this.eventBuffer.length) {
413
      const p = new Promise((resolve, reject) => ws.send(
414
        JSON.stringify(
L
lijiarui 已提交
415 416 417
          this.eventBuffer.shift(),
        ),
        (err: Error) => {
418 419
          if (err)  { reject(err) }
          else      { resolve()   }
L
lijiarui 已提交
420
        },
421 422 423 424 425 426 427 428 429
      ))
      list.push(p)
    }

    try {
      await Promise.all(list)
    } catch (e) {
      log.error('Io', 'send() exceptio: %s', e.stack)
      throw e
430
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
431
  }
432

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
433
  public async quit(): Promise<void> {
434 435 436 437
    if (!this.ws) {
      throw new Error('no ws')
    }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
438
    this.state.off('pending')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
439

440
    // try to send IoEvents in buffer
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
441
    await this.send()
442
    this.eventBuffer = []
443

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
444 445
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
446
      this.reconnectTimer = undefined
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
447
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
448 449

    this.ws.close()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
450

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
451
    this.state.off(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
452

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
453
    return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
454
  }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
455 456 457 458 459
  /**
   *
   * Prepare to be overwriten by server setting
   *
   */
460
  private async ioMessage(m: Message): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
461
    log.silly('Io', 'ioMessage() is a nop function before be overwriten from cloud')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
462 463 464
    if (typeof this.onMessage === 'function') {
      await this.onMessage(m)
    }
465
  }
466

467
}
468 469

export default Io