io.ts 10.9 KB
Newer Older
1 2
/**
 *
3
 * Wechaty: Wechat for Bot. and for human who talk to bot/robot
4
 *
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
5
 * Class Io
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
6
 * http://www.wechaty.io
7 8 9 10 11
 *
 * Licenst: ISC
 * https://github.com/zixia/wechaty
 *
 */
12
import * as WebSocket from 'ws'
13

14
import {
L
lijiarui 已提交
15
  Config,
16
  // WechatyEventName
L
lijiarui 已提交
17
  log,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
18
}                   from './config'
19

20 21
import { StateMonitor } from './state-monitor'
import { Wechaty }      from './wechaty'
22

23
export type IoSetting = {
L
lijiarui 已提交
24 25 26 27
  wechaty:    Wechaty,
  token:      string,
  apihost?:   string,
  protocol?:  string,
28 29
}

30 31 32 33 34
type IoEventName =  'botie'
                  | 'error'
                  | 'heartbeat'
                  | 'login'
                  | 'message'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
35
                  | 'update'
36 37 38
                  | 'raw'
                  | 'reset'
                  | 'scan'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
39
                  | 'sys'
40 41
                  | 'shutdown'

42
type IoEvent = {
L
lijiarui 已提交
43 44
  name:     IoEventName,
  payload:  any,
45
}
46

47
export class Io {
48 49 50
  public uuid: string

  private protocol: string
51
  private eventBuffer: IoEvent[] = []
52 53
  private ws: WebSocket

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
54
  private state = new StateMonitor<'online', 'offline'>('Io', 'offline')
55

56
  private reconnectTimer: NodeJS.Timer | null
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
57
  private reconnectTimeout: number | null
58

59 60 61 62
  private onMessage: Function

  constructor(private setting: IoSetting) {
    if (!setting.wechaty || !setting.token) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
63
      throw new Error('Io must has wechaty & token set')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
64
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
65

66 67
    setting.apihost   = setting.apihost   || Config.apihost
    setting.protocol  = setting.protocol  || Config.DEFAULT_PROTOCOL
68

69
    this.uuid     = setting.wechaty.uuid
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
70

71
    this.protocol = setting.protocol + '|' + setting.wechaty.uuid
L
lijiarui 已提交
72 73 74 75 76
    log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], uuid[%s]',
                setting.apihost,
                setting.token,
                setting.protocol,
                this.uuid,
77
              )
78 79
  }

80
  public toString() { return 'Class Io(' + this.setting.token + ')'}
81

82
  private connected() { return this.ws && this.ws.readyState === WebSocket.OPEN }
83

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
84
  public async init(): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
85
    log.verbose('Io', 'init()')
86

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
87 88
    this.state.target('online')
    this.state.current('online', false)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
89

90 91 92
    try {
      await this.initEventHook()
      await this.initWebSocket()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
93

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
94 95
      this.state.current('online')

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
96
      return
97
    } catch (e) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
98
      log.warn('Io', 'init() exception: %s', e.message)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
99
      this.state.current('offline')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
100
      throw e
101
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
102 103
  }

104
  private initWebSocket() {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
105
    log.verbose('Io', 'initWebSocket()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
106
    this.state.current('online', false)
107

108 109
    // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
    const auth = 'Token ' + this.setting.token
110 111
    const headers = { 'Authorization': auth }

112 113 114
    if (!this.setting.apihost) {
      throw new Error('no apihost')
    }
115
    let endpoint = 'wss://' + this.setting.apihost + '/v0/websocket'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
116 117

    // XXX quick and dirty: use no ssl for APIHOST other than official
118 119
    if (!/api\.wechaty\.io/.test(this.setting.apihost)) {
      endpoint = 'ws://' + this.setting.apihost + '/v0/websocket'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
120 121 122
    }

    const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
123

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
124
    ws.on('open', () => {
125
      if (this.protocol !== ws.protocol) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
126
        log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
127 128
        // XXX deal with error?
      }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
129
      log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
130 131
      // this.currentState('connected')
      this.state.current('online')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
132

133
      // FIXME: how to keep alive???
134
      // ws._socket.setKeepAlive(true, 100)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
135

136
      this.reconnectTimeout = null
137

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
138
      const initEvent: IoEvent = {
L
lijiarui 已提交
139 140
        name: 'sys',
        payload: 'Wechaty version ' + this.setting.wechaty.version() + ` with UUID: ${this.uuid}`,
141 142
      }
      this.send(initEvent)
143

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
144
    })
145

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
146
    ws.on('message', (data, flags) => {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
147
      log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
148 149
      // flags.binary will be set if a binary data is received.
      // flags.masked will be set if the data was masked.
150

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
151
      const ioEvent: IoEvent = {
L
lijiarui 已提交
152 153
        name: 'raw',
        payload: data,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
154
      }
155

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168
      try {
        const obj = JSON.parse(data)
        ioEvent.name    = obj.name
        ioEvent.payload = obj.payload
      } catch (e) {
        log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
      }

      switch (ioEvent.name) {
        case 'botie':
          const payload = ioEvent.payload
          if (payload.onMessage) {
            const script = payload.script
169
            /* tslint:disable:no-eval */
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
170 171 172 173 174 175 176 177
            const fn = eval(script)
            if (typeof fn === 'function') {
              this.onMessage = fn
            } else {
              log.warn('Io', 'server pushed function is invalid')
            }
          }
          break
178

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
179 180
        case 'reset':
          log.verbose('Io', 'on(reset): %s', ioEvent.payload)
181
          this.setting.wechaty.reset(ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
182
          break
183

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
184 185 186 187 188
        case 'shutdown':
          log.warn('Io', 'on(shutdown): %s', ioEvent.payload)
          process.exit(0)
          break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
189 190
        case 'update':
          log.verbose('Io', 'on(report): %s', ioEvent.payload)
191
          const user = this.setting.wechaty.puppet ? this.setting.wechaty.puppet.user : null
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
192
          if (user) {
193
            const loginEvent: IoEvent = {
L
lijiarui 已提交
194
              name:       'login',
195
              // , payload:  user.obj
L
lijiarui 已提交
196
              payload:  user,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
197 198
            }
            this.send(loginEvent)
199 200
          }

201 202 203 204
          // XXX: Puppet should not has `scan` variable ...
          const scan = this.setting.wechaty
                        && this.setting.wechaty.puppet
                        && this.setting.wechaty.puppet['scan']
205
          if (scan) {
206
            const scanEvent: IoEvent = {
L
lijiarui 已提交
207 208
              name: 'scan',
              payload: scan,
209 210 211
            }
            this.send(scanEvent)
          }
212

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
213
          break
214

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
215 216 217
        case 'sys':
          // do nothing
          break
218

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
219 220 221
        default:
          log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
          break
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
222
      }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
223
    })
224

225 226
    ws.on('error', e => {
      log.warn('Io', 'initWebSocket() error event[%s]', e.message)
227
      this.setting.wechaty.emit('error', e)
228

229 230 231 232 233 234 235 236 237
      // when `error`, there must have already a `close` event
      // we should not call this.reconnect() again
      //
      // this.close()
      // this.reconnect()
    })
    .on('close', (code, message) => {
      log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
      ws.close()
238
      this.reconnect()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
239 240
    })

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
241
    return Promise.resolve(ws)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
242 243
  }

244
  private reconnect() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
245 246
    log.verbose('Io', 'reconnect()')

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
247
    if (this.state.target() === 'offline') {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
248
      log.warn('Io', 'reconnect() canceled because state.target() === offline')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
249 250 251
      return
    }

252
    if (this.connected()) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
253
      log.warn('Io', 'reconnect() on a already connected io')
254 255 256
      return
    }
    if (this.reconnectTimer) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
257
      log.warn('Io', 'reconnect() on a already re-connecting io')
258 259 260 261 262 263 264
      return
    }

    if (!this.reconnectTimeout) {
      this.reconnectTimeout = 1
    } else if (this.reconnectTimeout < 10000) {
      this.reconnectTimeout *= 3
265
    }
266

267
    log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
268 269 270
    this.reconnectTimer = setTimeout(_ => {
      this.reconnectTimer = null
      this.initWebSocket()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
271
    }, this.reconnectTimeout)// as any as NodeJS.Timer
272 273
  }

274
  private initEventHook() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
275
    log.verbose('Io', 'initEventHook()')
276
    const wechaty = this.setting.wechaty
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
277 278 279

    wechaty.on('message', this.ioMessage)

280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
    wechaty.on('scan', (url, code) => this.send({ name: 'scan', payload: { url, code } }))

    wechaty.on('login'  , user => this.send({ name: 'login', payload: user }))
    wechaty.on('logout' , user => this.send({ name: 'login', payload: user }))

    wechaty.on('heartbeat', data  => this.send({ name: 'heartbeat', payload: { uuid: this.uuid, data } }))
    wechaty.on('error'    , error => this.send({ name: 'error', payload: error }))

    // const hookEvents: WechatyEventName[] = [
    //   'scan'
    //   , 'login'
    //   , 'logout'
    //   , 'heartbeat'
    //   , 'error'
    // ]
    // hookEvents.map(event => {
    //   wechaty.on(event, (data) => {
    //     const ioEvent: IoEvent = {
    //       name:       event
    //       , payload:  data
    //     }

    //     switch (event) {
    //       case 'login':
    //       case 'logout':
    //         if (data instanceof Contact) {
    //           // ioEvent.payload = data.obj
    //           ioEvent.payload = data
    //         }
    //         break

    //       case 'error':
    //         ioEvent.payload = data.toString()
    //         break

        //   case 'heartbeat':
        //     ioEvent.payload = {
        //       uuid: this.uuid
        //       , data: data
        //     }
        //     break

        //   default:
        //     break
        // }

    //     this.send(ioEvent)
    //   })
    // })
329

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
330 331 332 333
    // wechaty.on('message', m => {
    //   const text = (m.room() ? '[' + m.room().topic() + ']' : '')
    //               + '<' + m.from().name() + '>'
    //               + ':' + m.toStringDigest()
334

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
335 336
    //   this.send({ name: 'message', payload:  text })
    // })
337

338
    return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
339 340
  }

341
  private async send(ioEvent?: IoEvent): Promise<void> {
342 343
    if (ioEvent) {
      log.silly('Io', 'send(%s: %s)', ioEvent.name, ioEvent.payload)
344
      this.eventBuffer.push(ioEvent)
345
    } else { log.silly('Io', 'send()') }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
346 347

    if (!this.connected()) {
348
      log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
349
      return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
350 351
    }

352
    const list: Promise<any>[] = []
353
    while (this.eventBuffer.length) {
354
      const p = new Promise((resolve, reject) => this.ws.send(
355
        JSON.stringify(
L
lijiarui 已提交
356 357 358
          this.eventBuffer.shift(),
        ),
        (err: Error) => {
359 360
          if (err)  { reject(err) }
          else      { resolve()   }
L
lijiarui 已提交
361
        },
362 363 364 365 366 367 368 369 370
      ))
      list.push(p)
    }

    try {
      await Promise.all(list)
    } catch (e) {
      log.error('Io', 'send() exceptio: %s', e.stack)
      throw e
371
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
372
  }
373

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
374
  private async close(): Promise<void> {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
375
    log.verbose('Io', 'close()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
376 377
    this.state.target('offline')
    this.state.current('offline', false)
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
378

379
    this.ws.close()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
380 381
    this.state.current('offline')

382
    // TODO: remove listener for this.setting.wechaty.on(message )
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
383
    return Promise.resolve()
384 385
  }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
386
  public async quit(): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
387 388
    this.state.target('offline')
    this.state.current('offline', false)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
389

390
    // try to send IoEvents in buffer
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
391
    await this.send()
392
    this.eventBuffer = []
393

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
394 395
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
396
      this.reconnectTimer = null
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
397
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
398
    await this.close()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
399

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
400 401 402
    // this.currentState('disconnected')
    this.state.current('offline')

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
403 404
    return Promise.resolve()
  }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
405 406 407 408 409
  /**
   *
   * Prepare to be overwriten by server setting
   *
   */
410
  private ioMessage(m) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
411
    log.verbose('Io', 'ioMessage() is a nop function before be overwriten from cloud')
412
  }
413

414
}
415 416

export default Io