RxSwiftでは、処理をどのスレッドで実行するか指定するSchedulerが用意されていて、
このSchedulerをセットすることで、それぞれの処理に対して実行するスレッドを指定できます。
その時のSchedulerの作り方と、Schedulerを使う時にお行儀よく扱うための備忘録兼ねたメモです。
(本当にメモ程度に書いてるので所々表現おかしかったりするかもです。ゴメンナサイ…)

それぞれのScheduler


  • メインスレッドで実行するScheduler
let mainScheduler = MainScheduler.instance

MainScheduler.instanceを呼び出してあげれば良いようです。
このMainSchedulerは シングルトン になっているので、都度作られることはないようです。
中身を除いてみると、処理を内部で実行スレッドをチェックしたあと、必要があればメインスレッドで実行するようにしています。
GCDの、 dispatch_async()dispatch_get_main_queue() を渡しているようです。

let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueueQOS: .Default)

もしSerial(直列)にバックグラウンドで実行する場合には、SerialDispatchQueueSchedulerを用います。 globalConcurrentQueueQOS には、 バックグラウンドで実行するときの優先度を指定します。これも、GCDのQoS(Quality of Service)と同様のものとなっています。

これで作成されたSchedulerは、GCDのdispatch_queue_create("label", DISPATCH_QUEUE_SERIAL)と同様に作用します。
同様に、もしConCurrent(並列)にバックグラウンドで実行したい場合には、ConcurrentDispatchQueueSchedulerを用いればできます。

他にも、OperationQueueを使ったScheduler等も作ることができるようです。
ここに一覧がありますRecursiveSchedulerとかどういう時に使うんだろうか。。
あとはそれぞれ必要なところで、subscribeOnobserveOnにschedulerを渡してあげれば、指定したスレッドで動くようになります。
基本的に一度schedulerを指定したら、再度指定があるまでは、そのScheduler上で動くようです。

例えば、

  • ★ : observeOnで指定する (それ以降が指定したSchedulerで動く)
  • ☆ : subscribeOnで指定する (それ以前が指定したSchedulerで動く)

とした場合は、

  • 1) ★(Background)→A→B→★(Main)→C→D
  • 2) A→B→☆(Background)→★(Main)→C→D

どちらも同じように、 A, Bはバックグラウンドで、C, Dはメインスレッド上で処理が動きます。

どういった時に使うか


例えば、

  • A: APIを叩いてデータを取得
  • B: データのパース
  • C: パースしたデータ中のURLを使って画像を取得
  • D: 取得した画像をリサイズする
  • E: 画像をcellに貼り付ける

とした時に、

  • ★(Background)→A→B→C→D→★(Main)→E

(※ ★はObserveOn)
としてあげれば、データ取得や、時間のかかるリサイズ処理はバックグラウンドで行って、画像を貼り付けて描画する時はメインスレッド上で実行する、
といった使い方ができそうです。(UIの更新はメインスレッドでないと正しく反映されないので)

Schedulerの後処理というか、お行儀よく使うためには


サンプルを眺めてて思ったことを。

private func rx_JSON(URL: NSURL) -> Observable<AnyObject> {
    return $.URLSession
    .rx_JSON(URL)
    .trackActivity(loadingWikipediaData)
}

// Example wikipedia response http://en.wikipedia.org/w/api.php?action=opensearch&search=Rx
func getSearchResults(query: String) -> Observable<[WikipediaSearchResult]> {
    let escapedQuery = query.URLEscaped
    let urlContent = "http://en.wikipedia.org/w/api.php?action=opensearch&search=\(escapedQuery)"
    let url = NSURL(string: urlContent)!

    return rx_JSON(url)
    .observeOn($.backgroundWorkScheduler) // ★1
    .map { json in // ここは★1の指定があったのでバックグラウンド上で動く
        guard let json = json as? [AnyObject] else {
            throw exampleError("Parsing error")
        }

        return try WikipediaSearchResult.parseJSON(json)
    }
    .observeOn($.mainScheduler) // ★2
}

こんな感じでwikipediaからjsonを取得して、検索結果として[WikipediaSearchResult]を返す関数が用意されていますが、
最初のrx_JSON自体はNSURLSessionで通信が走るので、結果を受け取るまではバックグラウンド上動いていますが、結果を受け取った後は特に指定がなければその後の処理はメインスレッドで動作することになります。
そこで、取得した結果をWikipediaSearchResultにパースする処理が重たい場合、そのままメインスレッド上でやってしまうと画面が固まるおそれがあるので、一度バックグラウンドに逃がしています( ★1 )。
その後、パースが終わったらその後に繋げる処理がメインスレッドで動くように再度MainSchedulerを指定しています( ★2 )。
この★2を忘れてしまうと、

   // もし、getSearchResultsで、最後の
  // .observeOn($.mainScheduler) がなかった場合
   WikipediaAPI().getSearchResults("query").subscribeNext { [weak self] in
       print($0) //  ★3: これはバックグラウンド上で動く
       self?.tableView.reloadData() // これはまずい...
   }

( ★3 )の処理が バックグラウンド 上で処理されてしまいます。これがUIの更新とかを伴うと非常に まずい
なので、 ★2 のように、特にその後もバックグラウンドで実行する必要がない限りは、バックグラウンドに移したものは再度、メインスレッドに戻してから返すようにした方が お行儀が良さそう です。
特に、一連の流れを組んだストリームを返す関数を定義する場合は気をつけたほうがよさそうです。

まとめ


  • メインスレッドで動くSchedulerは、MainScheduler.instanceを使う
  • バックグラウンドで動くSchedulerは、SerialDispatchQueueScheduler等から生成して使う
  • observeOnsubScribeOnの違いに注意
  • バックグラウンドで動くSchedulerでの処理が終わった後、継続してバックグラウンドでの操作をする必要がない場合は、
    observeOn(MainScheduler.instance)をしてから返すとお行儀が良さそう。

参考