io.ts 11.9 KB
Newer Older
1 2
/**
 *
3
 * Wechaty: Wechat for Bot. and for human who talk to bot/robot
4
 *
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
5
 * Class Io
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
6
 * http://www.wechaty.io
7 8 9 10 11
 *
 * Licenst: ISC
 * https://github.com/zixia/wechaty
 *
 */
12
import * as WebSocket from 'ws'
13

14 15 16
import {
    Config
  // WechatyEventName
17
  , log
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
18
}                   from './config'
19

20 21
import { StateMonitor } from './state-monitor'
import { Wechaty }      from './wechaty'
22

23
export type IoSetting = {
24 25
  wechaty:    Wechaty
  token:      string
26 27
  apihost?:   string
  protocol?:  string
28 29
}

30 31 32 33 34 35 36 37
type IoEventName =  'botie'
                  | 'error'
                  | 'heartbeat'
                  | 'login'
                  | 'message'
                  | 'raw'
                  | 'reset'
                  | 'scan'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
38
                  | 'sys'
39 40
                  | 'shutdown'

41
type IoEvent = {
42
  name:     IoEventName
43 44
  payload:  any
}
45

46
export class Io {
47 48 49
  public uuid: string

  private protocol: string
50
  private eventBuffer: IoEvent[] = []
51 52
  private ws: WebSocket

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
53 54 55
  // private _currentState: string
  // private _targetState: string
  private state = new StateMonitor<'online', 'offline'>('Io', 'offline')
56

57
  private reconnectTimer: NodeJS.Timer | null
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
58
  private reconnectTimeout: number | null
59

60 61 62 63
  private onMessage: Function

  constructor(private setting: IoSetting) {
    if (!setting.wechaty || !setting.token) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
64
      throw new Error('Io must has wechaty & token set')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
65
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
66

67 68
    setting.apihost   = setting.apihost   || Config.apihost
    setting.protocol  = setting.protocol  || Config.DEFAULT_PROTOCOL
69

70
    this.uuid     = setting.wechaty.uuid
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
71

72 73 74 75 76 77 78
    this.protocol = setting.protocol + '|' + setting.wechaty.uuid
    log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], uuid[%s]'
              , setting.apihost
              , setting.token
              , setting.protocol
              , this.uuid
              )
79

80
    // this.purpose('offline')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
81 82
    // this.targetState('disconnected')
    // this.currentState('disconnected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
83 84
    // this.state.target('offline')
    // this.state.current('offline')
85 86
  }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
  // // targetState : 'connected' | 'disconnected'
  // private targetState(newState?) {
  //   if (newState) {
  //     log.verbose('Io', 'targetState(%s)', newState)
  //     this._targetState = newState
  //   }
  //   return this._targetState
  // }

  // // currentState : 'connecting' | 'connected' | 'disconnecting' | 'disconnected'
  // private currentState(newState?) {
  //   if (newState) {
  //     log.verbose('Io', 'currentState(%s)', newState)
  //     this._currentState = newState
  //   }
  //   return this._currentState
  // }
104

105
  public toString() { return 'Class Io(' + this.setting.token + ')'}
106

107
  private connected() { return this.ws && this.ws.readyState === WebSocket.OPEN }
108

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
109
  public async init(): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
110
    log.verbose('Io', 'init()')
111

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
112 113 114 115
    // this.targetState('connected')
    // this.currentState('connecting')
    this.state.target('online')
    this.state.current('online', false)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
116

117 118 119
    try {
      await this.initEventHook()
      await this.initWebSocket()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
120

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
121 122 123
      // this.currentState('connected')
      this.state.current('online')

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
124
      return
125
    } catch (e) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
126
      log.warn('Io', 'init() exception: %s', e.message)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
127 128
      // this.currentState('disconnected')
      this.state.current('offline')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
129
      throw e
130
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
131 132
  }

133
  private initWebSocket() {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
134
    log.verbose('Io', 'initWebSocket()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
135 136
    // this.currentState('connecting')
    this.state.current('online', false)
137

138 139
    // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
    const auth = 'Token ' + this.setting.token
140 141
    const headers = { 'Authorization': auth }

142 143 144
    if (!this.setting.apihost) {
      throw new Error('no apihost')
    }
145
    let endpoint = 'wss://' + this.setting.apihost + '/v0/websocket'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
146 147

    // XXX quick and dirty: use no ssl for APIHOST other than official
148 149
    if (!/api\.wechaty\.io/.test(this.setting.apihost)) {
      endpoint = 'ws://' + this.setting.apihost + '/v0/websocket'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
150 151 152
    }

    const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
153

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
154
    ws.on('open', () => {
155
      if (this.protocol !== ws.protocol) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
156
        log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
157 158
        // XXX deal with error?
      }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
159
      log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
160 161
      // this.currentState('connected')
      this.state.current('online')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
162

163
      // FIXME: how to keep alive???
164
      // ws._socket.setKeepAlive(true, 100)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
165

166
      this.reconnectTimeout = null
167

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
168
      const initEvent = <IoEvent>{
169
        name: 'sys'
170
        , payload: 'Wechaty version ' + this.setting.wechaty.version() + ` with UUID: ${this.uuid}`
171 172
      }
      this.send(initEvent)
173

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
174
    })
175

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
176
    ws.on('message', (data, flags) => {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
177
      log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
178 179
      // flags.binary will be set if a binary data is received.
      // flags.masked will be set if the data was masked.
180

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
181 182 183 184
      const ioEvent = {
        name: 'raw'
        , payload: data
      }
185

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198
      try {
        const obj = JSON.parse(data)
        ioEvent.name    = obj.name
        ioEvent.payload = obj.payload
      } catch (e) {
        log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
      }

      switch (ioEvent.name) {
        case 'botie':
          const payload = ioEvent.payload
          if (payload.onMessage) {
            const script = payload.script
199
            /* tslint:disable:no-eval */
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
200 201 202 203 204 205 206 207
            const fn = eval(script)
            if (typeof fn === 'function') {
              this.onMessage = fn
            } else {
              log.warn('Io', 'server pushed function is invalid')
            }
          }
          break
208

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
209 210
        case 'reset':
          log.verbose('Io', 'on(reset): %s', ioEvent.payload)
211
          this.setting.wechaty.reset(ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
212
          break
213

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
214 215 216 217 218
        case 'shutdown':
          log.warn('Io', 'on(shutdown): %s', ioEvent.payload)
          process.exit(0)
          break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
219 220
        case 'update':
          log.verbose('Io', 'on(report): %s', ioEvent.payload)
221
          const user = this.setting.wechaty.user()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
222
          if (user) {
223
            const loginEvent: IoEvent = {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
224
              name:       'login'
225 226
              // , payload:  user.obj
              , payload:  user
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
227 228
            }
            this.send(loginEvent)
229 230
          }

231 232 233 234
          // XXX: Puppet should not has `scan` variable ...
          const scan = this.setting.wechaty
                        && this.setting.wechaty.puppet
                        && this.setting.wechaty.puppet['scan']
235
          if (scan) {
236
            const scanEvent: IoEvent = {
237 238 239 240 241
              name: 'scan'
              , payload: scan
            }
            this.send(scanEvent)
          }
242

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
243
          break
244

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
245 246 247
        case 'sys':
          // do nothing
          break
248

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
249 250 251
        default:
          log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
          break
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
252
      }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
253
    })
254

255 256
    ws.on('error', e => {
      log.warn('Io', 'initWebSocket() error event[%s]', e.message)
257
      this.setting.wechaty.emit('error', e)
258

259 260 261 262 263 264 265 266 267
      // when `error`, there must have already a `close` event
      // we should not call this.reconnect() again
      //
      // this.close()
      // this.reconnect()
    })
    .on('close', (code, message) => {
      log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
      ws.close()
268
      this.reconnect()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
269 270
    })

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
271
    return Promise.resolve(ws)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
272 273
  }

274
  private reconnect() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
275 276
    log.verbose('Io', 'reconnect()')

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
277 278 279
    // if (this.targetState() === 'disconnected') {
    if (this.state.target() === 'offline') {
      log.verbose('Io', 'reconnect() canceled because state.target() === offline')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
280 281 282
      return
    }

283
    if (this.connected()) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
284
      log.warn('Io', 'reconnect() on a already connected io')
285 286 287
      return
    }
    if (this.reconnectTimer) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
288
      log.warn('Io', 'reconnect() on a already re-connecting io')
289 290 291 292 293 294 295
      return
    }

    if (!this.reconnectTimeout) {
      this.reconnectTimeout = 1
    } else if (this.reconnectTimeout < 10000) {
      this.reconnectTimeout *= 3
296
    }
297

298
    log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
299 300 301
    this.reconnectTimer = setTimeout(_ => {
      this.reconnectTimer = null
      this.initWebSocket()
302
    }, this.reconnectTimeout) as any as NodeJS.Timer
303 304
  }

305
  private initEventHook() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
306
    log.verbose('Io', 'initEventHook()')
307
    const wechaty = this.setting.wechaty
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
308 309 310

    wechaty.on('message', this.ioMessage)

311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
    wechaty.on('scan', (url, code) => this.send({ name: 'scan', payload: { url, code } }))

    wechaty.on('login'  , user => this.send({ name: 'login', payload: user }))
    wechaty.on('logout' , user => this.send({ name: 'login', payload: user }))

    wechaty.on('heartbeat', data  => this.send({ name: 'heartbeat', payload: { uuid: this.uuid, data } }))
    wechaty.on('error'    , error => this.send({ name: 'error', payload: error }))

    // const hookEvents: WechatyEventName[] = [
    //   'scan'
    //   , 'login'
    //   , 'logout'
    //   , 'heartbeat'
    //   , 'error'
    // ]
    // hookEvents.map(event => {
    //   wechaty.on(event, (data) => {
    //     const ioEvent: IoEvent = {
    //       name:       event
    //       , payload:  data
    //     }

    //     switch (event) {
    //       case 'login':
    //       case 'logout':
    //         if (data instanceof Contact) {
    //           // ioEvent.payload = data.obj
    //           ioEvent.payload = data
    //         }
    //         break

    //       case 'error':
    //         ioEvent.payload = data.toString()
    //         break

        //   case 'heartbeat':
        //     ioEvent.payload = {
        //       uuid: this.uuid
        //       , data: data
        //     }
        //     break

        //   default:
        //     break
        // }

    //     this.send(ioEvent)
    //   })
    // })
360

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
361 362 363 364
    // wechaty.on('message', m => {
    //   const text = (m.room() ? '[' + m.room().topic() + ']' : '')
    //               + '<' + m.from().name() + '>'
    //               + ':' + m.toStringDigest()
365

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
366 367
    //   this.send({ name: 'message', payload:  text })
    // })
368

369
    return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
370 371
  }

372
  private async send(ioEvent?: IoEvent): Promise<void> {
373 374
    if (ioEvent) {
      log.silly('Io', 'send(%s: %s)', ioEvent.name, ioEvent.payload)
375
      this.eventBuffer.push(ioEvent)
376
    } else { log.silly('Io', 'send()') }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
377 378

    if (!this.connected()) {
379
      log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
380
      return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
381 382
    }

383
    const list: Promise<any>[] = []
384
    while (this.eventBuffer.length) {
385
      const p = new Promise((resolve, reject) => this.ws.send(
386
        JSON.stringify(
387
          this.eventBuffer.shift()
388
        )
389 390 391 392 393 394 395 396 397 398 399 400 401
        , (err: Error) => {
          if (err)  { reject(err) }
          else      { resolve()   }
        }
      ))
      list.push(p)
    }

    try {
      await Promise.all(list)
    } catch (e) {
      log.error('Io', 'send() exceptio: %s', e.stack)
      throw e
402
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
403
  }
404

405
  private close() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
406
    log.verbose('Io', 'close()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
407 408 409 410
    // this.targetState('disconnected')
    // this.currentState('disconnecting')
    this.state.target('offline')
    this.state.current('offline', false)
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
411

412
    this.ws.close()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
413 414 415
    // this.currentState('disconnected')
    this.state.current('offline')

416
    // TODO: remove listener for this.setting.wechaty.on(message )
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
417
    return Promise.resolve()
418 419
  }

420
  public quit() {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
421 422 423 424
    // this.targetState('disconnected')
    // this.currentState('disconnecting')
    this.state.target('offline')
    this.state.current('offline', false)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
425

426 427
    // try to send IoEvents in buffer
    this.send()
428
    this.eventBuffer = []
429

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
430 431
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
432
      this.reconnectTimer = null
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
433 434 435
    }
    this.close()

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
436 437 438
    // this.currentState('disconnected')
    this.state.current('offline')

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
439 440
    return Promise.resolve()
  }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
441 442 443 444 445
  /**
   *
   * Prepare to be overwriten by server setting
   *
   */
446
  private ioMessage(m) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
447
    log.verbose('Io', 'ioMessage() is a nop function before be overwriten from cloud')
448
  }
449

450
}