io.ts 10.5 KB
Newer Older
1 2 3 4
/**
 *
 * wechaty: Wechat for Bot. and for human who talk to bot/robot
 *
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
5
 * Class Io
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
6
 * http://www.wechaty.io
7 8 9 10 11
 *
 * Licenst: ISC
 * https://github.com/zixia/wechaty
 *
 */
12 13
// const EventEmitter  = require('events')
import * as WebSocket from 'ws'
14
// const co            = require('co')
15

16 17 18 19
import Config   from './config'
import Contact  from './contact'
import Wechaty  from './wechaty'
import log      from './brolog-env'
20

21 22 23
type IoSetting = {
  wechaty:    Wechaty
  token:      string
24 25
  apihost?:   string
  protocol?:  string
26 27 28 29 30 31
}

type IoEvent = {
  name:     string
  payload:  any
}
32

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
33
class Io {
34 35 36 37 38 39 40 41 42 43 44
  public uuid: string

  private protocol: string
  private _eventBuffer = []
  private ws: WebSocket

  private _currentState: string
  private _targetState: string

  private reconnectTimer: NodeJS.Timer
  private reconnectTimeout: number
45

46 47 48 49
  private onMessage: Function

  constructor(private setting: IoSetting) {
    if (!setting.wechaty || !setting.token) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
50
      throw new Error('Io must has wechaty & token set')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
51
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
52

53 54
    setting.apihost   = setting.apihost   || Config.apihost
    setting.protocol  = setting.protocol  || Config.DEFAULT_PROTOCOL
55

56
    this.uuid     = setting.wechaty.uuid
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
57

58 59 60 61 62 63 64
    this.protocol = setting.protocol + '|' + setting.wechaty.uuid
    log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], uuid[%s]'
              , setting.apihost
              , setting.token
              , setting.protocol
              , this.uuid
              )
65

66 67 68
    // this.purpose('offline')
    this.targetState('disconnected')
    this.currentState('disconnected')
69 70
  }

71
  // targetState : 'connected' | 'disconnected'
72
  private targetState(newState?) {
73 74 75
    if (newState) {
      log.verbose('Io', 'targetState(%s)', newState)
      this._targetState = newState
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
76
    }
77
    return this._targetState
78 79
  }

80
  // currentState : 'connecting' | 'connected' | 'disconnecting' | 'disconnected'
81
  private currentState(newState?) {
82 83 84 85 86 87 88 89 90 91 92 93 94 95
    if (newState) {
      log.verbose('Io', 'currentState(%s)', newState)
      this._currentState = newState
    }
    return this._currentState
  }

  // purpose(newPurpose) {
  //   if (newPurpose) {
  //     this._purpose = newPurpose
  //   }
  //   return this._purpose
  // }

96
  public toString() { return 'Class Io(' + this.setting.token + ')'}
97

98
  private connected() { return this.ws && this.ws.readyState === WebSocket.OPEN }
99

100
  public async init(): Promise<Io> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
101
    log.verbose('Io', 'init()')
102

103 104 105
    // this.purpose('online')
    this.targetState('connected')
    this.currentState('connecting')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
106

107 108 109 110
    // return co.call(this, function* () {
    try {
      await this.initEventHook()
      await this.initWebSocket()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
111

112
      this.currentState('connected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
113
      return this
114 115
    // }).catch(e => {
    } catch (e) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
116
      log.warn('Io', 'init() exception: %s', e.message)
117
      this.currentState('disconnected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
118
      throw e
119
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
120 121
  }

122
  private initWebSocket() {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
123
    log.verbose('Io', 'initWebSocket()')
124 125
    this.currentState('connecting')

126 127
    // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
    const auth = 'Token ' + this.setting.token
128 129
    const headers = { 'Authorization': auth }

130
    let endpoint = 'wss://' + this.setting.apihost + '/v0/websocket'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
131 132

    // XXX quick and dirty: use no ssl for APIHOST other than official
133 134
    if (!/api\.wechaty\.io/.test(this.setting.apihost)) {
      endpoint = 'ws://' + this.setting.apihost + '/v0/websocket'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
135 136 137
    }

    const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
138

139
    ws.on('open', function open() {
140
      if (this.protocol !== ws.protocol) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
141
        log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
142 143
        // XXX deal with error?
      }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
144
      log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
145
      this.currentState('connected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
146

147
      // FIXME: how to keep alive???
148
      // ws._socket.setKeepAlive(true, 100)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
149

150
      this.reconnectTimeout = null
151 152 153

      const initEvent = {
        name: 'sys'
154
        , payload: 'Wechaty version ' + this.setting.wechaty.version() + ` with UUID: ${this.uuid}`
155 156
      }
      this.send(initEvent)
157

158
    }.bind(this))
159

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
160
    ws.on('message', (data, flags) => {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
161
      log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
162 163
      // flags.binary will be set if a binary data is received.
      // flags.masked will be set if the data was masked.
164

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
165 166 167 168
      const ioEvent = {
        name: 'raw'
        , payload: data
      }
169

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182
      try {
        const obj = JSON.parse(data)
        ioEvent.name    = obj.name
        ioEvent.payload = obj.payload
      } catch (e) {
        log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
      }

      switch (ioEvent.name) {
        case 'botie':
          const payload = ioEvent.payload
          if (payload.onMessage) {
            const script = payload.script
183
            /* tslint:disable:no-eval */
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
184 185 186 187 188 189 190 191
            const fn = eval(script)
            if (typeof fn === 'function') {
              this.onMessage = fn
            } else {
              log.warn('Io', 'server pushed function is invalid')
            }
          }
          break
192

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
193 194
        case 'reset':
          log.verbose('Io', 'on(reset): %s', ioEvent.payload)
195
          this.setting.wechaty.reset(ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
196
          break
197

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
198 199 200 201 202
        case 'shutdown':
          log.warn('Io', 'on(shutdown): %s', ioEvent.payload)
          process.exit(0)
          break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
203 204
        case 'update':
          log.verbose('Io', 'on(report): %s', ioEvent.payload)
205
          const user = this.setting.wechaty.user()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
206 207 208
          if (user) {
            const loginEvent = {
              name:       'login'
209 210
              // , payload:  user.obj
              , payload:  user
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
211 212
            }
            this.send(loginEvent)
213 214
          }

215 216 217 218
          // XXX: Puppet should not has `scan` variable ...
          const scan = this.setting.wechaty
                        && this.setting.wechaty.puppet
                        && this.setting.wechaty.puppet['scan']
219 220 221 222 223 224 225
          if (scan) {
            const scanEvent = {
              name: 'scan'
              , payload: scan
            }
            this.send(scanEvent)
          }
226

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
227
          break
228

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
229 230 231
        case 'sys':
          // do nothing
          break
232

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
233 234 235
        default:
          log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
          break
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
236
      }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
237
    })
238

239 240
    ws.on('error', e => {
      log.warn('Io', 'initWebSocket() error event[%s]', e.message)
241
      this.setting.wechaty.emit('error', e)
242

243 244 245 246 247 248 249 250 251
      // when `error`, there must have already a `close` event
      // we should not call this.reconnect() again
      //
      // this.close()
      // this.reconnect()
    })
    .on('close', (code, message) => {
      log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
      ws.close()
252
      this.reconnect()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
253 254
    })

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
255
    return Promise.resolve(ws)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
256 257
  }

258
  private reconnect() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
259 260
    log.verbose('Io', 'reconnect()')

261 262 263 264 265 266
    // if (this.purpose() === 'offline') {
    //   log.verbose('Io', 'reconnect() canceled because purpose() === offline')
    //   return
    // }
    if (this.targetState() === 'disconnected') {
      log.verbose('Io', 'reconnect() canceled because targetState() === disconnected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
267 268 269
      return
    }

270
    if (this.connected()) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
271
      log.warn('Io', 'reconnect() on a already connected io')
272 273 274
      return
    }
    if (this.reconnectTimer) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
275
      log.warn('Io', 'reconnect() on a already re-connecting io')
276 277 278 279 280 281 282
      return
    }

    if (!this.reconnectTimeout) {
      this.reconnectTimeout = 1
    } else if (this.reconnectTimeout < 10000) {
      this.reconnectTimeout *= 3
283
    }
284

285
    log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
286 287 288 289
    this.reconnectTimer = setTimeout(_ => {
      this.reconnectTimer = null
      this.initWebSocket()
    }, this.reconnectTimeout)
290 291
  }

292
  private initEventHook() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
293
    log.verbose('Io', 'initEventHook()')
294
    const wechaty = this.setting.wechaty
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
295 296 297

    wechaty.on('message', this.ioMessage)

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
298
    const hookEvents = [
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
299 300 301
      'scan'
      , 'login'
      , 'logout'
302
      , 'heartbeat'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
303
      , 'error'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
304
    ]
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
305
    hookEvents.map(event => {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
306
      wechaty.on(event, data => {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
307 308 309 310
        const ioEvent = {
          name:       event
          , payload:  data
        }
311

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
312 313 314 315
        switch (event) {
          case 'login':
          case 'logout':
            if (data instanceof Contact) {
316 317
              // ioEvent.payload = data.obj
              ioEvent.payload = data
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
318 319
            }
            break
320

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
321 322 323 324
          case 'error':
            ioEvent.payload = data.toString()
            break

325 326 327 328 329 330
          case 'heartbeat':
            ioEvent.payload = {
              uuid: this.uuid
              , data: data
            }
            break
331

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
332 333 334
          default:
            break
        }
335

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
336
        this.send(ioEvent)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
337
      })
338 339
    })

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
340 341 342 343 344 345 346 347 348 349
    // wechaty.on('message', m => {
    //   const text = (m.room() ? '['+m.room().name()+']' : '')
    //               + '<'+m.from().name()+'>'
    //               + ':' + m.toStringDigest()
    //   const messageEvent = {
    //     name:       'message'
    //     , payload:  text
    //   }
    //   this.send(messageEvent)
    // })
350

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
351 352 353
    return Promise.resolve()
  }

354
  private send(ioEvent?: IoEvent) {
355 356 357 358
    if (ioEvent) {
      log.silly('Io', 'send(%s: %s)', ioEvent.name, ioEvent.payload)
      this._eventBuffer.push(ioEvent)
    } else { log.silly('Io', 'send()') }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
359 360

    if (!this.connected()) {
361 362
      log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this._eventBuffer.length)
      return false
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
363 364
    }

365 366 367 368 369
    while (this._eventBuffer.length) {
      this.ws.send(
        JSON.stringify(
          this._eventBuffer.shift()
        )
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
370
      )
371
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
372
  }
373

374
  private close() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
375
    log.verbose('Io', 'close()')
376 377
    this.targetState('disconnected')
    this.currentState('disconnecting')
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
378

379
    this.ws.close()
380
    this.currentState('disconnected')
381
    // TODO: remove listener for this.setting.wechaty.on(message )
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
382
    return Promise.resolve()
383 384
  }

385
  public quit() {
386 387 388
    // this.purpose('offline')
    this.targetState('disconnected')
    this.currentState('disconnecting')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
389

390 391 392 393
    // try to send IoEvents in buffer
    this.send()
    this._eventBuffer = []

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
394 395
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
396
      this.reconnectTimer = null
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
397 398 399
    }
    this.close()

400
    this.currentState('disconnected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
401 402
    return Promise.resolve()
  }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
403 404 405 406 407
  /**
   *
   * Prepare to be overwriten by server setting
   *
   */
408
  private ioMessage(m) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
409
    log.verbose('Io', 'ioMessage() is a nop function before be overwriten from cloud')
410
  }
411

412 413 414 415 416
}

/**
 * Expose `Wechaty`.
 */
417 418
// module.exports = Io.default = Io.Io = Io
export default Io