以前こちらのQittaの記事でtakeしてもcompletedさせない!っていうのを書きましたが、
改めてこちらで詳しく書こうかなと思います
よくハマる罠
take
オペレーターを使うと指定した回数分までイベントが流れるように制限を加えることができます。
ただ、ここでうっかりハマってしまうのが、指定回数分イベントが流れた後に、completed
が送信されて完了してしまうということです。
Observable.of(1,2,3,4,5).take(2)
.subscribe { print($0) }
.disposed(by: disposeBag)
// next(1)
// next(2)
// completed
単体で使う分にはあまり影響がないのですが、他のストリームと組み合わせたりしているときに、このcompleted
が流れるとつられてcompleted
になってしまってイベントが流れない…なんてことになったりします。
「takeして指定回数分流れたらcompletedを流す」という通常使用でしたらそのままで良いのですが、「takeして指定回数分流れたらそのストリームは以降何も流れないようにストップさせる」という形で使うのであれば、次のようにします
take,concat,neverを組み合わせる
Rxにはnever
という、以降completed
にならなくなるオペレーターがあります。これをtakeと組み合わせることで、指定回数イベントを流した後にtake
から流れてくるcompleted
をせき止めることができます。
組み合わせる時には、concat
オペレーターを使います。 concat
とmerge
の違いはこちらの記事でも触れています。
extension ObservableType {
public func takeNoCompleted(_ count: Int) -> Observable<E> {
return .concat(take(count), .never())
// 略さず書くとこんな感じ
// return Observable.concat(self.take(count), Observable.never())
}
}
実際に使ってみると次のようになります。
Observable.of(1,2,3,4,5).takeNoCompleted(2)
.subscribe { print($0) }
.disposed(by: disposeBag)
// next(1)
// next(2)
// completedは流れない!
singleオペレーターとasSingle()
の場合だと?
特にtake(1)
で1回だけに制限したい場合に、他の代替案としてsingle
オペレーターを使うことも考えられますが、2回目以降はエラーが流れてくるので注意が必要です。
Observable.of(1,2,3,4,5).single()
.subscribe { print($0) }
.disposed(by: disposeBag)
// next(1)
// error(Sequence contains more than one element.)
もしsingle
で同じことを実現するには、エラーを次のように握りつぶしてあげれば良さそうです。
Observable.of(1,2,3,4,5).single()
.catchError { _ in Observable.never() }
.subscribe { print($0) }
.disposed(by: disposeBag)
// next(1)
asSingle()
でSingle Unitに変換した場合も同様にエラーが流れてきます。
Observable.of(1,2,3,4,5).asSingle()
.subscribe { print($0) }
.disposed(by: disposeBag)
// error(Sequence contains more than one element.)
こちらの場合は内部的にonNext
が1回きりだったか見ているため、そうでなければ値すら流れず、「Sequence contains more than one element.」のエラーが送信されます。
まとめ
takeにハマったら.debug()
を使ってcompleted
が流れていないか確認してみましょう。