RxJavaのcomposeっぽいものをRxSwiftで

Wednesday, April 26, 2017

今日たまたま同僚のAndroidエンジニアと話をしていて、 「RxSwiftにcomposeとかliftあればいいのにね」という話があったので、試しにcomposeっぽいものを作ってみました。

もしも既に全く同じようなのがRxSwiftにあったらごめんなさい。チョットジシンナイデス。

その前にcomposeって何よ

さっくり説明すると、

  • Observableのオペレーターで、引数で受け取るTransformerを自身に適応してObservableを返す
  • TransformerObservableを受け取って、それを加工してObservableを返すもの

のようです。さっくりですみません。
詳しくはこちらでしっかりとした解説があります。

flatMapと似てそうですが、
flatMapが元のObservableから送信されたイベント(値)を元に新たなObservableを返却するのに対し、
composeは元のObservable自体を受け取って、それにTransformerを適応して新たなObservableを返却するので、 元のObservableを直接触れるかObservableのイベント(値)を触れるか が違いになってきそうです。

このあたりの説明に関してはこちらが参考になります。

実装してみる

ということで実装してみます。
composeオペレーターはObservableTypeのextensionで実装し、それとは別にTransformerstructで実装します。

struct ComposeTransformer<T, R> {
    let transformer: (Observable<T>) -> Observable<R>
    init(transformer: @escaping (Observable<T>) -> Observable<R>) {
        self.transformer = transformer
    }

    func call(_ observable: Observable<T>) -> Observable<R> {
        return transformer(observable)
    }
}

extension ObservableType {
    func compose<T>(_ transformer: ComposeTransformer<E, T>) -> Observable<T> {
        return transformer.call(self.asObservable())
    }
}

便宜上TransformerはComposeTransformerとして定義しました。

使ってみる

例えば、 Observableに対してsubscribeOnでBackgroundSchedulerを、observeOnでMainSchedulerを適応する applySchedulers(_:) という関数を考えてみます。

func applySchedulers<T>(_ observable: Observable<T>) -> Observable<T> {
    return observable
        .subscribeOn(SerialDispatchQueueScheduler(qos: .default))
        .observeOn(MainScheduler.instance)
}

これを使って、1~5までの数字を足し合わせる処理をbackgroundで実行し、その後mainで結果を出力するストリームを組んでみます

applySchedulers(
    Observable
        .just([1, 2, 3, 4, 5])
        .map{ $0.reduce(0, +) }
    )
    .subscribe({ event in
        print(event)
    })

書き方が良いとは言えないですね。
ここで、先ほど定義したcomposeを使ってみます。

Observable.just([1, 2, 3, 4, 5])
    .map{ $0.reduce(0, +) }
    .compose(ComposeTransformer<Int, Int> { observable in
        return observable
            .subscribeOn(SerialDispatchQueueScheduler(qos: .default))
            .observeOn(MainScheduler.instance)
    }).subscribe({ event in
        print(event)
    })

型推論がしっかり効く状態なら、<Int, Int>の部分は省略しても大丈夫です。この例だとObservable<Int>にsubscribeOnとobserveOnを付与してObservableを返しているだけなので省略することができます。

.compose(ComposeTransformer { observable in
    return observable
        .subscribeOn(SerialDispatchQueueScheduler(qos: .default))
        .observeOn(MainScheduler.instance)
})

もちろん、以下のようなものをextensionで加えるのもありですが、composeTransformerを使ったほうが任意の処理を繋げられるので良さそうです。

extension ObservableType {
    func applySchedulers() -> Observable<E> {
        return self.asObservable()
            .subscribeOn(SerialDispatchQueueScheduler(qos: .default))
            .observeOn(MainScheduler.instance)
    }
}

さいごに

flatMapcomposeが最初なかなか飲み込めなかったのですが、ようやく理解できてきた感じでした。
いつものようにGistにまとめてあります。

余談

実はTransformerはobjectではなくて単なるclosureで良い気もしてきましたが、どっちが良いのでしょう…。

extension ObservableType {
    func compose<T>(_ transformer: (Observable<E>) -> Observable<T>) -> Observable<T> {
        return transformer(self.asObservable())
    }
}

Observable.just([1,2,3,4,5])
    .map{ $0.reduce(0, +) }
    .compose({ observable in
        return observable
            .subscribeOn(SerialDispatchQueueScheduler(qos: .default))
            .observeOn(MainScheduler.instance)
    }).subscribe({ event in
        print(event)
    })

一度何かの変数に入れて再利用を考えると、前者の方が良さそうな気はします。
余談でした。

参考

techRxSwiftSwiftcomposeRxJava

続・takeしてもcompletedにさせない

FirebaseAuth+RxSwift