io.ts 10.6 KB
Newer Older
1 2 3 4
/**
 *
 * wechaty: Wechat for Bot. and for human who talk to bot/robot
 *
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
5
 * Class Io
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
6
 * http://www.wechaty.io
7 8 9 10 11
 *
 * Licenst: ISC
 * https://github.com/zixia/wechaty
 *
 */
12 13
// const EventEmitter  = require('events')
import * as WebSocket from 'ws'
14
// const co            = require('co')
15

16 17 18
import Config, {
  WechatyEventType
}   from './config'
19 20 21
import Contact  from './contact'
import Wechaty  from './wechaty'
import log      from './brolog-env'
22

23 24 25
type IoSetting = {
  wechaty:    Wechaty
  token:      string
26 27
  apihost?:   string
  protocol?:  string
28 29 30 31 32 33
}

type IoEvent = {
  name:     string
  payload:  any
}
34

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
35
class Io {
36 37 38 39 40 41 42 43 44 45 46
  public uuid: string

  private protocol: string
  private _eventBuffer = []
  private ws: WebSocket

  private _currentState: string
  private _targetState: string

  private reconnectTimer: NodeJS.Timer
  private reconnectTimeout: number
47

48 49 50 51
  private onMessage: Function

  constructor(private setting: IoSetting) {
    if (!setting.wechaty || !setting.token) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
52
      throw new Error('Io must has wechaty & token set')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
53
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
54

55 56
    setting.apihost   = setting.apihost   || Config.apihost
    setting.protocol  = setting.protocol  || Config.DEFAULT_PROTOCOL
57

58
    this.uuid     = setting.wechaty.uuid
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
59

60 61 62 63 64 65 66
    this.protocol = setting.protocol + '|' + setting.wechaty.uuid
    log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], uuid[%s]'
              , setting.apihost
              , setting.token
              , setting.protocol
              , this.uuid
              )
67

68 69 70
    // this.purpose('offline')
    this.targetState('disconnected')
    this.currentState('disconnected')
71 72
  }

73
  // targetState : 'connected' | 'disconnected'
74
  private targetState(newState?) {
75 76 77
    if (newState) {
      log.verbose('Io', 'targetState(%s)', newState)
      this._targetState = newState
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
78
    }
79
    return this._targetState
80 81
  }

82
  // currentState : 'connecting' | 'connected' | 'disconnecting' | 'disconnected'
83
  private currentState(newState?) {
84 85 86 87 88 89 90 91 92 93 94 95 96 97
    if (newState) {
      log.verbose('Io', 'currentState(%s)', newState)
      this._currentState = newState
    }
    return this._currentState
  }

  // purpose(newPurpose) {
  //   if (newPurpose) {
  //     this._purpose = newPurpose
  //   }
  //   return this._purpose
  // }

98
  public toString() { return 'Class Io(' + this.setting.token + ')'}
99

100
  private connected() { return this.ws && this.ws.readyState === WebSocket.OPEN }
101

102
  public async init(): Promise<Io> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
103
    log.verbose('Io', 'init()')
104

105 106 107
    // this.purpose('online')
    this.targetState('connected')
    this.currentState('connecting')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
108

109 110 111 112
    // return co.call(this, function* () {
    try {
      await this.initEventHook()
      await this.initWebSocket()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
113

114
      this.currentState('connected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
115
      return this
116 117
    // }).catch(e => {
    } catch (e) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
118
      log.warn('Io', 'init() exception: %s', e.message)
119
      this.currentState('disconnected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
120
      throw e
121
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
122 123
  }

124
  private initWebSocket() {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
125
    log.verbose('Io', 'initWebSocket()')
126 127
    this.currentState('connecting')

128 129
    // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
    const auth = 'Token ' + this.setting.token
130 131
    const headers = { 'Authorization': auth }

132
    let endpoint = 'wss://' + this.setting.apihost + '/v0/websocket'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
133 134

    // XXX quick and dirty: use no ssl for APIHOST other than official
135 136
    if (!/api\.wechaty\.io/.test(this.setting.apihost)) {
      endpoint = 'ws://' + this.setting.apihost + '/v0/websocket'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
137 138 139
    }

    const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
140

141
    ws.on('open', function open() {
142
      if (this.protocol !== ws.protocol) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
143
        log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
144 145
        // XXX deal with error?
      }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
146
      log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
147
      this.currentState('connected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
148

149
      // FIXME: how to keep alive???
150
      // ws._socket.setKeepAlive(true, 100)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
151

152
      this.reconnectTimeout = null
153 154 155

      const initEvent = {
        name: 'sys'
156
        , payload: 'Wechaty version ' + this.setting.wechaty.version() + ` with UUID: ${this.uuid}`
157 158
      }
      this.send(initEvent)
159

160
    }.bind(this))
161

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
162
    ws.on('message', (data, flags) => {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
163
      log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
164 165
      // flags.binary will be set if a binary data is received.
      // flags.masked will be set if the data was masked.
166

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
167 168 169 170
      const ioEvent = {
        name: 'raw'
        , payload: data
      }
171

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184
      try {
        const obj = JSON.parse(data)
        ioEvent.name    = obj.name
        ioEvent.payload = obj.payload
      } catch (e) {
        log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
      }

      switch (ioEvent.name) {
        case 'botie':
          const payload = ioEvent.payload
          if (payload.onMessage) {
            const script = payload.script
185
            /* tslint:disable:no-eval */
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
186 187 188 189 190 191 192 193
            const fn = eval(script)
            if (typeof fn === 'function') {
              this.onMessage = fn
            } else {
              log.warn('Io', 'server pushed function is invalid')
            }
          }
          break
194

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
195 196
        case 'reset':
          log.verbose('Io', 'on(reset): %s', ioEvent.payload)
197
          this.setting.wechaty.reset(ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
198
          break
199

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
200 201 202 203 204
        case 'shutdown':
          log.warn('Io', 'on(shutdown): %s', ioEvent.payload)
          process.exit(0)
          break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
205 206
        case 'update':
          log.verbose('Io', 'on(report): %s', ioEvent.payload)
207
          const user = this.setting.wechaty.user()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
208 209 210
          if (user) {
            const loginEvent = {
              name:       'login'
211 212
              // , payload:  user.obj
              , payload:  user
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
213 214
            }
            this.send(loginEvent)
215 216
          }

217 218 219 220
          // XXX: Puppet should not has `scan` variable ...
          const scan = this.setting.wechaty
                        && this.setting.wechaty.puppet
                        && this.setting.wechaty.puppet['scan']
221 222 223 224 225 226 227
          if (scan) {
            const scanEvent = {
              name: 'scan'
              , payload: scan
            }
            this.send(scanEvent)
          }
228

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
229
          break
230

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
231 232 233
        case 'sys':
          // do nothing
          break
234

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
235 236 237
        default:
          log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
          break
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
238
      }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
239
    })
240

241 242
    ws.on('error', e => {
      log.warn('Io', 'initWebSocket() error event[%s]', e.message)
243
      this.setting.wechaty.emit('error', e)
244

245 246 247 248 249 250 251 252 253
      // when `error`, there must have already a `close` event
      // we should not call this.reconnect() again
      //
      // this.close()
      // this.reconnect()
    })
    .on('close', (code, message) => {
      log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
      ws.close()
254
      this.reconnect()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
255 256
    })

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
257
    return Promise.resolve(ws)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
258 259
  }

260
  private reconnect() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
261 262
    log.verbose('Io', 'reconnect()')

263 264 265 266 267 268
    // if (this.purpose() === 'offline') {
    //   log.verbose('Io', 'reconnect() canceled because purpose() === offline')
    //   return
    // }
    if (this.targetState() === 'disconnected') {
      log.verbose('Io', 'reconnect() canceled because targetState() === disconnected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
269 270 271
      return
    }

272
    if (this.connected()) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
273
      log.warn('Io', 'reconnect() on a already connected io')
274 275 276
      return
    }
    if (this.reconnectTimer) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
277
      log.warn('Io', 'reconnect() on a already re-connecting io')
278 279 280 281 282 283 284
      return
    }

    if (!this.reconnectTimeout) {
      this.reconnectTimeout = 1
    } else if (this.reconnectTimeout < 10000) {
      this.reconnectTimeout *= 3
285
    }
286

287
    log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
288 289 290 291
    this.reconnectTimer = setTimeout(_ => {
      this.reconnectTimer = null
      this.initWebSocket()
    }, this.reconnectTimeout)
292 293
  }

294
  private initEventHook() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
295
    log.verbose('Io', 'initEventHook()')
296
    const wechaty = this.setting.wechaty
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
297 298 299

    wechaty.on('message', this.ioMessage)

300
    const hookEvents: WechatyEventType[] = [
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
301 302 303
      'scan'
      , 'login'
      , 'logout'
304
      , 'heartbeat'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
305
      , 'error'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
306
    ]
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
307
    hookEvents.map(event => {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
308
      wechaty.on(event, data => {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
309 310 311 312
        const ioEvent = {
          name:       event
          , payload:  data
        }
313

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
314 315 316 317
        switch (event) {
          case 'login':
          case 'logout':
            if (data instanceof Contact) {
318 319
              // ioEvent.payload = data.obj
              ioEvent.payload = data
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
320 321
            }
            break
322

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
323 324 325 326
          case 'error':
            ioEvent.payload = data.toString()
            break

327 328 329 330 331 332
          case 'heartbeat':
            ioEvent.payload = {
              uuid: this.uuid
              , data: data
            }
            break
333

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
334 335 336
          default:
            break
        }
337

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
338
        this.send(ioEvent)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
339
      })
340 341
    })

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
342 343 344 345 346 347 348 349 350 351
    // wechaty.on('message', m => {
    //   const text = (m.room() ? '['+m.room().name()+']' : '')
    //               + '<'+m.from().name()+'>'
    //               + ':' + m.toStringDigest()
    //   const messageEvent = {
    //     name:       'message'
    //     , payload:  text
    //   }
    //   this.send(messageEvent)
    // })
352

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
353 354 355
    return Promise.resolve()
  }

356
  private send(ioEvent?: IoEvent) {
357 358 359 360
    if (ioEvent) {
      log.silly('Io', 'send(%s: %s)', ioEvent.name, ioEvent.payload)
      this._eventBuffer.push(ioEvent)
    } else { log.silly('Io', 'send()') }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
361 362

    if (!this.connected()) {
363 364
      log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this._eventBuffer.length)
      return false
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
365 366
    }

367 368 369 370 371
    while (this._eventBuffer.length) {
      this.ws.send(
        JSON.stringify(
          this._eventBuffer.shift()
        )
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
372
      )
373
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
374
  }
375

376
  private close() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
377
    log.verbose('Io', 'close()')
378 379
    this.targetState('disconnected')
    this.currentState('disconnecting')
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
380

381
    this.ws.close()
382
    this.currentState('disconnected')
383
    // TODO: remove listener for this.setting.wechaty.on(message )
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
384
    return Promise.resolve()
385 386
  }

387
  public quit() {
388 389 390
    // this.purpose('offline')
    this.targetState('disconnected')
    this.currentState('disconnecting')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
391

392 393 394 395
    // try to send IoEvents in buffer
    this.send()
    this._eventBuffer = []

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
396 397
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
398
      this.reconnectTimer = null
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
399 400 401
    }
    this.close()

402
    this.currentState('disconnected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
403 404
    return Promise.resolve()
  }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
405 406 407 408 409
  /**
   *
   * Prepare to be overwriten by server setting
   *
   */
410
  private ioMessage(m) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
411
    log.verbose('Io', 'ioMessage() is a nop function before be overwriten from cloud')
412
  }
413

414 415 416 417 418
}

/**
 * Expose `Wechaty`.
 */
419 420
// module.exports = Io.default = Io.Io = Io
export default Io