io.ts 11.8 KB
Newer Older
1
/**
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
2
 *   Wechaty - https://github.com/chatie/wechaty
3
 *
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
4
 *   @copyright 2016-2018 Huan LI <zixia@zixia.net>
5
 *
6 7 8
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
9
 *
10 11 12 13 14 15 16
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
17 18
 *
 */
19
import * as WebSocket from 'ws'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
20
import StateSwitch    from 'state-switch'
21

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
22
import PuppetWeb  from './puppet-web/'
23

24
import {
25
  config,
L
lijiarui 已提交
26
  log,
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
27 28
}                 from './config'
import Wechaty    from './wechaty'
29

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
30
export interface IoOptions {
L
lijiarui 已提交
31 32 33 34
  wechaty:    Wechaty,
  token:      string,
  apihost?:   string,
  protocol?:  string,
35 36
}

37 38 39 40
type IoEventName =  'botie'
                  | 'error'
                  | 'heartbeat'
                  | 'login'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
41
                  | 'logout'
42
                  | 'message'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
43
                  | 'update'
44 45 46
                  | 'raw'
                  | 'reset'
                  | 'scan'
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
47
                  | 'sys'
48 49
                  | 'shutdown'

50
interface IoEvent {
L
lijiarui 已提交
51 52
  name:     IoEventName,
  payload:  any,
53
}
54

55
export class Io {
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
56
  public cuid: string
57

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
58 59 60
  private protocol    : string
  private eventBuffer : IoEvent[] = []
  private ws          : WebSocket
61

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
62
  private state = new StateSwitch('Io', log)
63

64
  private reconnectTimer: NodeJS.Timer | null
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
65
  private reconnectTimeout: number | null
66

67 68
  private onMessage: Function

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
69 70 71 72
  constructor(
    private options: IoOptions,
  ) {
    options.apihost   = options.apihost   || config.apihost
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
73
    options.protocol  = options.protocol  || config.default.DEFAULT_PROTOCOL
74

Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
75
    this.cuid = options.wechaty.cuid
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
76

Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
77 78
    this.protocol = options.protocol + '|' + options.wechaty.cuid
    log.verbose('Io', 'instantiated with apihost[%s], token[%s], protocol[%s], cuid[%s]',
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
79 80 81
                      options.apihost,
                      options.token,
                      options.protocol,
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
82
                      this.cuid,
83
              )
84 85
  }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
86 87 88
  public toString() {
    return `Io<${this.options.token}>`
  }
89

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
90 91 92
  private connected() {
    return this.ws && this.ws.readyState === WebSocket.OPEN
  }
93

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
94
  public async init(): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
95
    log.verbose('Io', 'init()')
96

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
97
    this.state.on('pending')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
98

99 100
    try {
      await this.initEventHook()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
101
      this.ws = this.initWebSocket()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
102

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
103
      this.state.on(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
104

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
105
      return
106
    } catch (e) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
107
      log.warn('Io', 'init() exception: %s', e.message)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
108
      this.state.off(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
109
      throw e
110
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
111 112
  }

113
  private initEventHook() {
Huan (李卓桓)'s avatar
log  
Huan (李卓桓) 已提交
114
    log.verbose('Io', 'initEventHook()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
115
    const wechaty = this.options.wechaty
116

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
117
    wechaty.on('error'    , error =>        this.send({ name: 'error',      payload: error }))
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
118
    wechaty.on('heartbeat', data  =>        this.send({ name: 'heartbeat',  payload: { cuid: this.cuid, data } }))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
119 120
    wechaty.on('login',     user =>         this.send({ name: 'login',      payload: user }))
    wechaty.on('logout' ,   user =>         this.send({ name: 'logout',     payload: user }))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
121
    wechaty.on('message',   message =>      this.ioMessage(message))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
122
    wechaty.on('scan',      (url, code) =>  this.send({ name: 'scan',       payload: { url, code } }))
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152

    // const hookEvents: WechatyEventName[] = [
    //   'scan'
    //   , 'login'
    //   , 'logout'
    //   , 'heartbeat'
    //   , 'error'
    // ]
    // hookEvents.map(event => {
    //   wechaty.on(event, (data) => {
    //     const ioEvent: IoEvent = {
    //       name:       event
    //       , payload:  data
    //     }

    //     switch (event) {
    //       case 'login':
    //       case 'logout':
    //         if (data instanceof Contact) {
    //           // ioEvent.payload = data.obj
    //           ioEvent.payload = data
    //         }
    //         break

    //       case 'error':
    //         ioEvent.payload = data.toString()
    //         break

        //   case 'heartbeat':
        //     ioEvent.payload = {
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
153
        //       cuid: this.cuid
154 155 156 157 158 159 160 161 162 163 164
        //       , data: data
        //     }
        //     break

        //   default:
        //     break
        // }

    //     this.send(ioEvent)
    //   })
    // })
165

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
166 167 168 169
    // wechaty.on('message', m => {
    //   const text = (m.room() ? '[' + m.room().topic() + ']' : '')
    //               + '<' + m.from().name() + '>'
    //               + ':' + m.toStringDigest()
170

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
171 172
    //   this.send({ name: 'message', payload:  text })
    // })
173

174
    return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
175 176
  }

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
177 178
  private initWebSocket() {
    log.verbose('Io', 'initWebSocket()')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
179
    // this.state.current('on', false)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
180 181 182 183 184 185 186 187 188 189 190

    // const auth = 'Basic ' + new Buffer(this.setting.token + ':X').toString('base64')
    const auth = 'Token ' + this.options.token
    const headers = { 'Authorization': auth }

    if (!this.options.apihost) {
      throw new Error('no apihost')
    }
    let endpoint = 'wss://' + this.options.apihost + '/v0/websocket'

    // XXX quick and dirty: use no ssl for APIHOST other than official
191 192
    // FIXME: use a configuarable VARIABLE for the domain name at here:
    if (!/api\.chatie\.io/.test(this.options.apihost)) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
193 194 195 196 197 198 199
      endpoint = 'ws://' + this.options.apihost + '/v0/websocket'
    }

    const ws = this.ws = new WebSocket(endpoint, this.protocol, { headers })

    ws.on('open',     () => this.wsOnOpen(ws))
    ws.on('message',  data => this.wsOnMessage(data))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
200
    ws.on('error',    e => this.wsOnError(e))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
201
    ws.on('close',    (code, reason) => this.wsOnClose(ws, code, reason))
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
202 203 204 205 206 207 208 209 210 211 212

    return ws
  }

  private wsOnOpen(ws: WebSocket): void {
    if (this.protocol !== ws.protocol) {
      log.error('Io', 'initWebSocket() require protocol[%s] failed', this.protocol)
      // XXX deal with error?
    }
    log.verbose('Io', 'initWebSocket() connected with protocol [%s]', ws.protocol)
    // this.currentState('connected')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
213
    // this.state.current('on')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
214 215 216 217 218 219 220

    // FIXME: how to keep alive???
    // ws._socket.setKeepAlive(true, 100)

    this.reconnectTimeout = null

    const name    = 'sys'
Huan (李卓桓)'s avatar
wip...  
Huan (李卓桓) 已提交
221
    const payload = 'Wechaty version ' + this.options.wechaty.version() + ` with CUID: ${this.cuid}`
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256

    const initEvent: IoEvent = {
      name,
      payload,
    }
    this.send(initEvent)
  }

  private wsOnMessage(data: WebSocket.Data) {
    log.silly('Io', 'initWebSocket() ws.on(message): %s', data)
    // flags.binary will be set if a binary data is received.
    // flags.masked will be set if the data was masked.

    if (typeof data !== 'string') {
      throw new Error('data should be string...')
    }

    const ioEvent: IoEvent = {
      name    : 'raw',
      payload : data,
    }

    try {
      const obj = JSON.parse(data)
      ioEvent.name    = obj.name
      ioEvent.payload = obj.payload
    } catch (e) {
      log.verbose('Io', 'on(message) recv a non IoEvent data[%s]', data)
    }

    switch (ioEvent.name) {
      case 'botie':
        const payload = ioEvent.payload
        if (payload.onMessage) {
          const script = payload.script
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
257 258 259 260 261 262 263 264 265 266 267
          try {
            /* tslint:disable:no-eval */
            const fn = eval(script)
            if (typeof fn === 'function') {
              this.onMessage = fn
            } else {
              log.warn('Io', 'server pushed function is invalid')
            }
          } catch (e) {
            log.warn('Io', 'server pushed function exception: %s', e)
            this.options.wechaty.emit('error', e)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
268 269 270 271 272 273 274 275 276 277
          }
        }
        break

      case 'reset':
        log.verbose('Io', 'on(reset): %s', ioEvent.payload)
        this.options.wechaty.reset(ioEvent.payload)
        break

      case 'shutdown':
278
        log.info('Io', 'on(shutdown): %s', ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
279 280 281 282 283 284 285 286 287 288
        process.exit(0)
        break

      case 'update':
        log.verbose('Io', 'on(report): %s', ioEvent.payload)
        const user = this.options.wechaty.puppet ? this.options.wechaty.puppet.user : null

        if (user) {
          const loginEvent: IoEvent = {
            name    : 'login',
289 290 291 292
            payload : {
              id: user.id,
              name: user.name(),
            },
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
          }
          this.send(loginEvent)
        }

        const puppet = this.options.wechaty.puppet
        if (puppet instanceof PuppetWeb) {
          const scanInfo = puppet.scanInfo
          if (scanInfo) {
            const scanEvent: IoEvent = {
              name: 'scan',
              payload: scanInfo,
            }
            this.send(scanEvent)
          }
        }

        break

      case 'sys':
        // do nothing
        break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
315
      case 'logout':
316
        log.info('Io', 'on(logout): %s', ioEvent.payload)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
317 318 319
        this.options.wechaty.logout()
        break

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
320 321 322 323 324 325
      default:
        log.warn('Io', 'UNKNOWN on(%s): %s', ioEvent.name, ioEvent.payload)
        break
    }
  }

326 327 328 329
  // FIXME: it seems the parameter `e` might be `undefined`.
  // @types/ws might has bug for `ws.on('error',    e => this.wsOnError(e))`
  private wsOnError(e?: Error) {
    log.warn('Io', 'initWebSocket() error event[%s]', e && e.message)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
    this.options.wechaty.emit('error', e)

    // when `error`, there must have already a `close` event
    // we should not call this.reconnect() again
    //
    // this.close()
    // this.reconnect()
  }

  private wsOnClose(
    ws      : WebSocket,
    code    : number,
    message : string,
  ): void {
    log.warn('Io', 'initWebSocket() close event[%d: %s]', code, message)
    ws.close()
    this.reconnect()
  }

  private reconnect() {
    log.verbose('Io', 'reconnect()')

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
352
    if (this.state.off()) {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
      log.warn('Io', 'reconnect() canceled because state.target() === offline')
      return
    }

    if (this.connected()) {
      log.warn('Io', 'reconnect() on a already connected io')
      return
    }
    if (this.reconnectTimer) {
      log.warn('Io', 'reconnect() on a already re-connecting io')
      return
    }

    if (!this.reconnectTimeout) {
      this.reconnectTimeout = 1
    } else if (this.reconnectTimeout < 10 * 1000) {
      this.reconnectTimeout *= 3
    }

    log.warn('Io', 'reconnect() will reconnect after %d s', Math.floor(this.reconnectTimeout / 1000))
    this.reconnectTimer = setTimeout(_ => {
      this.reconnectTimer = null
      this.initWebSocket()
    }, this.reconnectTimeout)// as any as NodeJS.Timer
  }

379
  private async send(ioEvent?: IoEvent): Promise<void> {
380 381
    if (ioEvent) {
      log.silly('Io', 'send(%s: %s)', ioEvent.name, ioEvent.payload)
382
      this.eventBuffer.push(ioEvent)
383
    } else { log.silly('Io', 'send()') }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
384 385

    if (!this.connected()) {
386
      log.verbose('Io', 'send() without a connected websocket, eventBuffer.length = %d', this.eventBuffer.length)
387
      return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
388 389
    }

390
    const list: Promise<any>[] = []
391
    while (this.eventBuffer.length) {
392
      const p = new Promise((resolve, reject) => this.ws.send(
393
        JSON.stringify(
L
lijiarui 已提交
394 395 396
          this.eventBuffer.shift(),
        ),
        (err: Error) => {
397 398
          if (err)  { reject(err) }
          else      { resolve()   }
L
lijiarui 已提交
399
        },
400 401 402 403 404 405 406 407 408
      ))
      list.push(p)
    }

    try {
      await Promise.all(list)
    } catch (e) {
      log.error('Io', 'send() exceptio: %s', e.stack)
      throw e
409
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
410
  }
411

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
412
  public async quit(): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
413
    this.state.off('pending')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
414

415
    // try to send IoEvents in buffer
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
416
    await this.send()
417
    this.eventBuffer = []
418

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
419 420
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
421
      this.reconnectTimer = null
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
422
    }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
423 424

    this.ws.close()
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
425

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
426
    this.state.off(true)
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
427

Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
428
    return
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
429
  }
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
430 431 432 433 434
  /**
   *
   * Prepare to be overwriten by server setting
   *
   */
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
435
  private async ioMessage(m): Promise<void> {
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
436
    log.silly('Io', 'ioMessage() is a nop function before be overwriten from cloud')
Huan (李卓桓)'s avatar
Huan (李卓桓) 已提交
437 438 439
    if (typeof this.onMessage === 'function') {
      await this.onMessage(m)
    }
440
  }
441

442
}
443 444

export default Io