Moonglade

使用 RxJS 掌控异步

February 01, 2017


教你使用 RxJS 在 200 行代码内优雅的实现文件分片断点续传

Intro

ben lesh 经常在他的各种 talking 中将 RxJS 比作 Lodash for Async 用来彰显 RxJS 的强大异步控制能力,而 RxJS 对于异步而言确实有着媲美 lodash 之于 Array 的强大功能。与 lodash 的优秀性能类似,RxJS 在操作异步任务的时的性能也是非常优秀的,并不会因为高等级的抽象而牺牲过多的性能。本篇文章将会以一个相对复杂的异步任务为例子,逐步介绍 RxJS 如何简洁优雅的进行复杂的异步控制。

准备工作

learning-rxjs clone 项目所需的 seed,并基于 article3-seed checkout 一个你的 article3 分支。本文中所有涉及到 RxJS 的代码将全部使用 TypeScript 编写。

这篇文章中,我们将使用 RxJS 实现以下功能:

article3-seed 分支附带了一个简单的文件上传的 server,它的作用是实现一个简单的文件分片上传 API。

一共有三个 API 提供调用:

  • post /api/upload/chunk

    用来获取文件的分片信息,上传文件大小,文件 md5,文件修改时间和文件名

    服务端返回文件分片数量,每个分片大小和文件唯一的 fileKey

    • Request body:
{
  fileSize: string // 文件大小
  fileMD5: string // 文件 md5
  lastUpdated: ISOString // 文件上次修改时间
  fileName: string // 文件名
}
  • Response:
{
  chunkSize: number // 每个分片大小
  chunks: number // 分片数量
  fileKey: string // 唯一文件 id
}
  • post /api/upload/chunk/:fileKey?chunk=:chunk&chunks=:chunks

    用来上传文件分片

    • Request header: 'Content-Type': 'application/octet-stream'
    • Request body: blob
    • Response: 'ok' or error message
  • post /api/upload/chunk/:fileKey

    结算文件分片,后端会将各分片拼接成一个完整的文件并返回

    Response: 'ok' or error message

在这几个 API 的基础上,我们将在这篇文章中实现以下的功能

  1. 在 add 按钮左边增加一个按钮,用来选择一个文件 & (暂停 & 恢复) 文件的上传

  2. 在增加文件后:

    • 计算文件 md5 ,文件名,文件上次修改时间,文件大小等信息
    • 调用 post /api/upload/chunk 接口,获取文件分片信息
    • 根据获取的分片信息对文件分片,并且调用 post /api/upload/chunk/:fileKey?chunk=:chunk&chunks=:chunks 上传文件分片,上传过程保证每次只有三个分片同时上传
    • 上传完所有的分片后,调用 post /api/upload/chunk/:fileKey 结算文件
  3. 上传的过程中,input 框下有一个进度条显示上传进度

  4. 在上传开始后,选择文件的按钮变成暂停按钮,点击则暂停上传

  5. 点击暂停上传按钮后,按钮变成继续上传按钮,点击则在暂停的地方继续上传

为了实现上面的功能,并且将这些功能与之前的 todo app 区分开来,我们在 src 文件夹下新建一个 FileUploader.ts 文件并在这个文件中实现这些需求:

// FileUploader.ts
import { Observable } from 'rxjs'

// @warn memory leak
const $attachment = document.querySelector('.attachment')

export class FileUploader {
  private file$ = Observable.fromEvent($attachment, 'change')
    .map((r: Event) => (r.target as HTMLInputElement).files[0])
    .filter((f) => !!f)

  uploadStream$ = this.file$
}

在 html 中加入 attachment 节点:

// index.html ...
<div class="input-group-btn">
  <label
    class="btn btn-default btn-file glyphicon glyphicon-paperclip attachment"
  >
    <input type="file" style="display: none;" />
  </label>
  <div class="btn btn-default button-add">Add</div>
</div>
...

调整一下样式:

// style.css
... .attachment {
  top: 0;
}

然后在 app.ts 中将这个我们将要实现功能的流 mergeapp$ 中:

...
import { FileUploader } from './FileUploader'
...
const uploader = new FileUploader()

const app$ = toggle$.merge(remove$, search$, uploader.uploadStream$)
  .do(r => {
    console.log(r)
  })

app$.subscribe()

这个时候通过 attachment 按钮选择一个文件,就已经可以在控制台中看到从 app$ 中流出的 file了:

1485952042909

获取文件分片信息

我们使用 FileReader + spark-md5 计算文件的 md5 信息,其它信息直接可以从 File 对象上拿到。而这里的 FileReader 读取文件是一个异步的过程,我们将它封装成 Observable 以便和 uploadStream$ 组合:

import { Observable, Observer } from 'rxjs'
// spark-md5 没有第三方 .d.ts 文件,这里用 commonjs 风格的 require 它
// 如果未再 tsconfig.json 中设置 noImplicitAny: true 且 TypeScript 版本大于 2.1 则也可以用
// import * as SparkMD5 from 'spark-md5' 的方式引用
const SparkMD5 = require('spark-md5')
const $attachment = document.querySelector('.attachment')

interface FileInfo {
  fileSize: number
  fileMD5: string
  lastUpdated: string
  fileName: string
}

export class FileUploader {
  private file$ = Observable.fromEvent($attachment, 'change')
    .map((r: Event) => (r.target as HTMLInputElement).files[0])
    .filter((f) => !!f)

  uploadStream$ = this.file$.switchMap(this.readFileInfo)

  private readFileInfo(
    file: File,
  ): Observable<{ file: File; fileinfo: FileInfo }> {
    const reader = new FileReader()
    const spark = new SparkMD5.ArrayBuffer()
    reader.readAsArrayBuffer(file)
    return Observable.create(
      (observer: Observer<{ file: File; fileinfo: FileInfo }>) => {
        reader.onload = (e: Event) => {
          spark.append((e.target as FileReader).result)
          const fileMD5 = spark.end()
          observer.next({
            file,
            fileinfo: {
              fileMD5,
              fileSize: file.size,
              lastUpdated: file.lastModifiedDate.toISOString(),
              fileName: file.name,
            },
          })
          observer.complete()
        }
        return () => {
          if (!reader.result) {
            console.warn('read file aborted')
            reader.abort()
          }
        }
      },
    )
  }
}

此时已经可以看到文件的 FileInfo可以从 app$ 中流出:

1485952954068

再使用文件信息通过 post /api/upload/chunk 接口获取文件的分片信息:

...
const apiHost = 'http://127.0.0.1:5000/api'
...

interface ChunkMeta {
  fileSize: number
  chunkSize: number
  chunks: number
  fileKey: string
}
...

export class FileUploader {
  ...
  uploadStream$ = this.file$
    .switchMap(this.readFileInfo)
    .switchMap(i => Observable.ajax
      .post(`${apiHost}/upload/chunk`, i.fileinfo)
    )
}

1485953393406

分片上传

获取分片信息之后,我们首先要做的事情是将文件按照分片信息分片,做一个 slice 方法来将文件分片:

...
export class FileUploader {
  ...
  uploadStream$ = this.file$
    .switchMap(this.readFileInfo)
    .switchMap(i => Observable.ajax
      .post(`${apiHost}/upload/chunk`, i.fileinfo)
      .map((r) => {
        const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)
        return { blobs, chunkMeta: r.response }
      })
    )

  ...

  private slice(file: File, n: number, chunkSize: number): Blob[] {
    const result: Blob[] = []
    for (let i = 0; i < n; i ++) {
      const startSize = i * chunkSize
      const slice = file.slice(startSize, i === n - 1 ? startSize + (file.size - startSize) : (i + 1) * chunkSize)
      result.push(slice)
    }
    return result
  }
}

这时,我们就能看到分片后的 blobs 和 meta 信息:

210237

将文件切片完成之后,我们需要实现一个上传分片的方法:

...
export class FileUploader {
  ...

  uploadStream$ = this.file$
    .switchMap(this.readFileInfo)
    .switchMap(i => Observable.ajax
      .post(`${apiHost}/upload/chunk`, i.fileinfo)
      .map((r) => {
        const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)
        return { blobs, chunkMeta: r.response }
      })
    )
    .switchMap(({ blobs, chunkMeta }) => {
      const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob))
      const uploadStream = Observable.from(dists)
        .mergeAll(this.concurrency)

      return Observable.forkJoin(uploadStream)
        .mapTo(chunkMeta)
    })

  constructor(
    private concurrency = 3
  ) { }
  ...
  private uploadChunk(meta: ChunkMeta, index: number, blob: Blob) {
    const host = `${apiHost}/upload/chunk/${meta.fileKey}?chunk=${index + 1}&chunks=${meta.chunks}`
    return Observable.ajax({
      url: host,
      body: blob,
      method: 'post',
      crossDomain: true,
      headers: { 'Content-Type': 'application/octet-stream' }
    })
  }
}

这里的 uploadChunk 是上传单个文件分片的方法,uploadStream$ 中最后面一个 switchMap 中的逻辑是使用 mergeAll 操作符将所有上传的流 merge 成一个 Observable,行为就是并发的上传所有的分片。而下面的 forkJoin 操作符则是等 merge 之后的 uploadStream complete 之后再 emit 一个结果。这里的 mergeAll + forkJoin 的用法其实与 Promise.all 的行为非常类似,这里也可以写成:

...
const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob))

return Observable.forkJoin(... dists)
  .mapTo(chunkMeta)
...

但我们有一个需求是 上传过程保证每次只有三个分片同时上传 , 所以需要使用 mergeAll 方法并传入 concurrency = 3 来控制并发数量,现在可以选择一个文件在 Devtool 上观察上传的行为。如果程序没有出问题行为应该是:并发上传文件分片,并且永远只有 3 个分片同时上传,在上传完所有分片后 app$ 中流出 chunkMeta 数据。

最后,我们只需要结算这些分片,这个文件就算上传完成了:

...
export class FileUploader {
  ...
  uploadStream$ = this.file$
    .switchMap(this.readFileInfo)
    .switchMap(i => Observable.ajax
      .post(`${apiHost}/upload/chunk`, i.fileinfo)
      .map((r) => {
        const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)
        return { blobs, chunkMeta: r.response }
      })
    )
    .switchMap(({ blobs, chunkMeta }) => {
      const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob))
      const uploadStream = Observable.from(dists)
        .mergeAll(this.concurrency)

      return Observable.forkJoin(uploadStream)
        .mapTo(chunkMeta)
    })
    .switchMap((r: ChunkMeta) => Observable.ajax.post(`${apiHost}/upload/chunk/${r.fileKey}`)
      .mapTo({
        action: 'UPLOAD_SUCCESS',
        payload: r
      })
    )
}

这时,选择一个文件后,可以看到它被分片上传,并且在结算后在 $app 中发送了一条数据:

{
  "action": "UPLOAD_SUCCESS",
  "payload": {
    "chunkSize": 1048576,
    "chunks": 26,
    "fileKey": "00a12bdc10449d8ec93883a7d45292a30c",
    "fileSize": 26621938
  }
}

并且在项目的 chunks 文件夹下面可以找到这个被结算的文件。

进度条

为了实现在界面中实时显示进度条,我们先要在 index.html 中加入进度条标签:

// index.html ...
<div class="progress">
  <div
    class="progress-bar progress-bar-success"
    role="progressbar"
    aria-valuenow="0"
    aria-valuemin="0"
    aria-valuemax="100"
    style="width: 0%"
  >
    <span>0%</span>
  </div>
</div>
...

调整一下样式中文字的颜色:

// style.css
... .progress-bar > span {
  color: black;
}

这个时候界面看起来应该是这样的:

213506

要获取总体的上传进度,必须先获取单个分片的上传进度,Observable.ajax 有一个方法可以获取 progress:..

...
import { Observable, Observer, Subscriber } from 'rxjs'
...

export class FileUploader {
  ...

  private uploadChunk(meta: ChunkMeta, index: number, blob: Blob): Observable<ProgressEvent> {
    const host = `${apiHost}/upload/chunk/${meta.fileKey}?chunk=${index + 1}&chunks=${meta.chunks}`
    return Observable.create((subscriber: Subscriber<ProgressEvent>) => {
      const ajax$ = Observable.ajax({
        url: host,
        body: blob,
        method: 'post',
        crossDomain: true,
        headers: { 'Content-Type': 'application/octet-stream' },
        progressSubscriber: subscriber
      })
      const subscription = ajax$.subscribe()
      return () => subscription.unsubscribe()
    })
  }
}

这样一来我们就可以在 uploadSteram$ 中计算总体的上传进度了:

...

export class FileUploader {

  uploadStream$ = this.file$
    .switchMap(this.readFileInfo)
    .switchMap(i => Observable.ajax
      .post(`${apiHost}/upload/chunk`, i.fileinfo)
      .map((r) => {
        const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize)
        return { blobs, chunkMeta: r.response }
      })
    )
    .switchMap(({ blobs, chunkMeta }) => {
      const uploaded: number[] = []
      const dists = blobs.map((blob, index) => {
        let currentLoaded = 0
        return this.uploadChunk(chunkMeta, index, blob)
          .do(r => {
            currentLoaded = r.loaded / chunkMeta.fileSize
            uploaded[index] = currentLoaded
            const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0))
            const p = Math.round(percent * 100)
            $progressBar.style.width = `${p}%`
            $progressBar.firstElementChild.textContent = `${p > 1 ? p - 1 : p} %`
          })
      })

      const uploadStream = Observable.from(dists)
        .mergeAll(this.concurrency)

      return Observable.forkJoin(uploadStream)
        .mapTo(chunkMeta)
    })
    .switchMap((r: ChunkMeta) => Observable.ajax.post(`${apiHost}/upload/chunk/${r.fileKey}`)
      .mapTo({
        action: 'UPLOAD_SUCCESS',
        payload: r
      })
    )
    .do(() => {
      $progressBar.firstElementChild.textContent = '100 %'
    })
}

这个时候我们可以在界面中看到文件分片上传的进度了。

而一般为了方便使用与调试,我们一般将所有的类似:

{
  "action": "UPLOAD_SUCCESS",
  "payload": {
    "chunkSize": 1048576,
    "chunks": 26,
    "fileKey": "00a12bdc10449d8ec93883a7d45292a30c",
    "fileSize": 26621938
  }
}

的 local state 放在一个流里面:

import { Observable, Subscriber, Subject } from 'rxjs'
...
type Action = 'pause' | 'resume' | 'progress' | 'complete'
...
export class FileUploader {
  ...
  private action$ = new Subject<{
    name: Action
    payload?: any
  }>()

  private progress$ = this.action$
    .filter(action => action.name === 'progress')
    .map(action => action.payload)
  	.do(r => {
      const percent = Math.round(r * 100)
      $progressBar.style.width = `${percent}%`
      $progressBar.firstElementChild.textContent = `${percent > 1 ? percent - 1 : percent} %`
  	})
    .map(r => ({ action: 'PROGRESS', payload: r }))


  uploadStream$ = this.file$
    ...

        return this.uploadChunk(chunkMeta, index, blob)
          .do(r => {
            currentLoaded = r.loaded / chunkMeta.fileSize
            uploaded[index] = currentLoaded
            const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0))
            this.action$.next({ name: 'progress', payload: percent })
          })
	...
    .merge(this.progerss$)
}

这时控制台会出现更直观的调试信息:

221058

暂停,续传

根据需求,我们在选择文件后,选择文件的按钮将会变成一个暂停按钮,我们可以用 Observable.fromEvent来实现这个需求:

...
export class FileUploader {
  ...

  private click$ = Observable.fromEvent($attachment, 'click')
    .map((e: Event) => e.target)
    .filter((e: HTMLElement) => e === $attachment)
    .scan((acc: number, val: HTMLElement) => {
      if (val.classList.contains('glyphicon-paperclip')) {
        return 1
      }
      if (acc === 2) {
        return 3
      }
      return 2
    }, 3)
    .filter(v => v !== 1)
  	.do((v) => {
      if (v === 2) {
        this.action$.next({ name: 'pause' })
        $attachment.classList.remove('glyphicon-pause')
        $attachment.classList.add('glyphicon-play')
      } else {
        this.action$.next({ name: 'resume' })
        this.buildPauseIcon()
      }
    })

  uploadStream$ = this.file$
	.switchMap...
  	.switchMap...
  	.do(() => this.buildPauseIcon())
  	...
    .do(() => {
      $progressBar.firstElementChild.textContent = '100 %'
      // restore icon
      $attachment.classList.remove('glyphicon-pause')
      $attachment.classList.add('glyphicon-paperclip');
      ($attachment.firstElementChild as HTMLInputElement).disabled = false
    })
    .merge(this.progress$, this.click$)

  // side effect
  private buildPauseIcon() {
    $attachment.classList.remove('glyphicon-paperclip')
    $attachment.classList.add('glyphicon-pause');
    ($attachment.firstElementChild as HTMLInputElement).disabled = true
  }
}

这段代码用到涉及到的概念比较多,我们一点点来理解:

uploadStream$ 的两个 switchMap 下插入了一个 do 操作符,这段代码的作用是将文件上传的图标变成暂停的图标。

然后我们新建了一个 click$ 流,为了防止事件冒泡导致的重复推送值,我们用 map + filter 过滤掉了子节点冒泡上来的事件。而为了区分点击的是 上传文件按钮还是 暂停按钮还是 继续按钮,我们用 1,2,3 三个值代表三个不同的点击事件,并使用 scan操作符不停的生成这三个状态。scan 的行为与 Array#reduce 非常相似,它接受一个 accumulator 不停的根据当前的值和状态累加出新的状态(没错,和 Redux 中的 reducer 行为一致)。而在下面的 do 操作符中我们根据不同的状态改变按钮的icon

这个时候我们观察上传的过程中,点击暂停/继续,图标的状态可以正确切换了。并且在上传完成后图标也被恢复成上传文件的初始状态了。

为了让整个文件上传可以暂停与继续,我们在 uploadChunk 下使用 takeUntilrepeatWhen & retryWhen 操作符:

...
export class FileUploader {
  ...
  private action$ = ...
  private pause$ = this.action$.filter(ac => ac.name === 'pause')
  private resume$ = this.action$.filter(ac => ac.name === 'resume')
  private progress$ = this.action$
    ...
    .distinctUntilChanged((x: number, y: number) => x - y >= 0)
    ...

  ...

  private uploadChunk(meta: ChunkMeta, index: number, blob: Blob): Observable<ProgressEvent> {
    ...
    return Observable.create(
      ...
      const ajax$ = Observable.ajax({
      ...
      })
        .takeUntil(this.pause$)
        .repeatWhen(() => this.resume$)
      const subscription = ajax$.subscribe()
      return () => subscription.unsubscribe()
    )
      .retryWhen(() => this.resume$)
  }
}

takeUntil 操作符接受一个 Observable,它在这个 Observable 发射值的时候终止上面的 Observable

repeatWhenretryWhen 操作符都是接受一个 projectFunc,它返回一个 Observable 并在这个 Observable 发射值的时候 重复/重试。

而在暂停恢复的过程中,进度条的数字可能显示错误:上传了一部分的请求被 abort,它的 progress 已经计算过一次了,重试的时候是重新上传,则可能会导致进度条后退,这时我们在 progress$ 后面用 distinctUntilChanged 方法即可实现 只有在进度增长的时候发射值 这一效果。

结语

这是一篇超级抽象的文章,并且受限于未使用框架,在程序中使用了大量的副作用操作do,总体看起来并没有特别优雅。真正优雅的 FRP 应该是将 RxJS 与 Redux + React 这样的框架结合起来,那时这个文件上传的组件就可以有更优雅的写法。当然它的功能并不完备,很多 edge case 例如各个步骤中的异常处理都没有做,但没有关系,这里只起到一个示范作用来展示 RxJS 在处理异步上的强大功能,并且让初学者有机会亲手把玩 RxJS 的各种操作符并实现一个复杂的异步场景。在后面的文章中,将会深入前面这三篇文章中涉及到或未涉及到的各种操作符,逐渐拨开 RxJS 的迷雾。


Written by 太狼
Frontend Developer at day, Rustacean at night.