RxJava SubscribeOn과 ObserveOn의 동작 차이
참고 : https://github.com/ReactiveX/RxAndroid/blob/2.x/README.md
SubscribeOn
Observable의 create()부터 onNext(), onComplete(), onError()까지 모든 과정을 subscribeOn()으로 지정한 스레드에서 실행시킨다.
ObserveOn
Observable의 메소드체인 호출 사이에서 observeOn()이 호출된 지점부터 onNext(), onComplete(), onError()까지의 과정을 observeOn()으로 지정한 스레드에서 실행시킨다.
public class MainActivity extends Activity {
private static final String TAG = "RxAndroidSamples";
private final CompositeDisposable disposables = new CompositeDisposable();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.main_activity);
findViewById(R.id.button_run_scheduler).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
onRunSchedulerExampleButtonClicked();
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear();
}
void onRunSchedulerExampleButtonClicked() {
disposables.add(sampleObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
// subscribeOn()에 의해 Observable의 create()와 subscribeWith()에 구현된 onNext(), onComplete(), onError() 모두 io스레드에서 실행돼야 한다.
// 하지만 observeOn()에 의해 subscribeWith()에 구현된 onNext(), onComplete(), onError()는 main스레드에서 실행된다.
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onComplete() {
Log.d(TAG, "onComplete() " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError() " + Thread.currentThread().getName());
}
@Override
public void onNext(String string) {
Log.d(TAG, "onNext(" + string + ") " + Thread.currentThread().getName());
}
}));
}
static Observable<String> sampleObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
Log.d(TAG, "create() " + Thread.currentThread().getName());
return Observable.just("one");
}
});
}
}