[실무의 벽 넘기] RxJava로 네트워크 로직 Reactive하게 만들기

[실무의 벽 넘기] RxJava로 네트워크 로직 Reactive하게 만들기

안드로이드 개발자로 일하게 되면서 혼자 앱을 만들 때와 실무에서 사용하는 기술 사이에 벽을 느꼈다. 지금 생각해보면 당연한 일이었지만 처음 소스코드를 받았을 때 이해되지 않는 코드를 마주한 건 충격이었다. 그 중 하나는 네트워크 함수들의 반환형이 Object, List 형이 아닌 Observable, Observable> 형으로 되어있던 것이다. 이 Observable의 정체를 파헤치다가 RxJava에 대해 알게 되었다. 이제부터 Observable의 정체는 무엇인지, RxJava는 무엇인지, 왜 네트워크 함수에 RxJava를 사용해야 했는지 나눠보려고 한다.

Rx + Java: ReactiveX란 무엇인가?

Java 앞에 붙은 Rx가 무슨 뜻인지 알아보는 것부터 시작하자. Rx는 ReactiveX(Reactive eXtensions)의 줄임말이다. ReactiveX는 여러 언어로 구현된 API이고 언어(및 플랫폼) 이름 앞에 Rx를 prefix로 붙여 이름을 지었다. 쉽게 말하면 RxJava는 ReactiveX를 Java 언어로 구현한 라이브러리인 것이다.

ReactiveX 언어별 라이브러리 목록: RxJava, RxSwift, RxAndroid가 포함되어 있다 

ReactiveX는 비동기(asynchronous) 작업을 반응형(reactive) 방식으로 처리하는 라이브러리이다. 서버에서 사진 10장을 받아 화면에 보여주는 앱을 만든다고 생각해보자. 개발자에게는 두가지 선택권이 있다.

1. 사진 10장을 받아올 때까지 기다린다 (멈춘다). 다운로드가 완료되면 화면에 보여준다.

2. 사진을 화면에 보여주는 콜백(Callback) 함수를 정의한다. 사진 10장을 받으면 콜백(Callback) 함수가 호출된다.

1번은 작업을 순서대로 처리하기 때문에 사진 10장을 받아올 때까지 아무 작업도 할 수 없다. 이렇게 순서대로 처리하는 방식을 동기적(synchronous)이라고 한다. 2번은 사진을 받아오는 작업이 종료될 때까지 기다리지 않고 다른 작업을 하고있다가 사진 10장을 받으면 콜백(Callback) 함수를 호출해 사진을 화면에 보여준다. 이러한 방식을 비동기적(asynchronous)이라고 한다.

당연하게도 2번 방식이 나아보인다. 하지만 ReactiveX는 세번째 선택권을 준다.

3. 사진을 1장씩 받을 때마다 화면에 표시한다. 사진을 받아오는 중간에는 다른 작업을 수행할 수 있다.

3번이 가능한 이유는 ReactiveX는 사진 10장을 받아오는 작업 중, 각 데이터(사진)의 이벤트에 반응형(reactive)으로 동작하기 때문이다. 마치 콜백(Callback) 함수들이 맞물려 10번 불리는 것과 같다. 하지만 실제로 콜백(Callback)을 10번 맞물리게 구현하는 것은 코드의 가독성이 떨어지고 잠재적으로 문제를 일으킬 수 있다. ReactiveX는 Callback의 문제를 최소화하면서 선언형(declarative)으로 간단하게 비동기 처리를 구현할 수 있도록 해준다. 이제 RxJava가 어떻게 이러한 ReactiveX의 마법을 가능하게 하는지 이야기해 보겠다.

Observable, Subscriber, Operator

RxJava를 이해하기 위해선 3가지 핵심 요소를 알아야 한다. 바로 Observable과 Subscriber, 그리고 Operator이다.

Observable은 직역하면 'observe + able', 관찰이 가능한 대상이다. 데이터가 떠내려오는 개울(stream)이 있다고 하자. 이 개울이 관찰 대상, Observable이다. 그리고 개울을 관찰(Observe)하는 사람을 Observer이라고 하자. RxJava에서 이 개울은 관찰하는 사람이 없으면 흐르지 않는다 (Cold Observable). 개울에 관찰하는 사람을 붙이는 것을 구독(Subscribe)한다고 하고 앞으로 관찰하는 사람을 Observer 대신 Subscriber라고 부를 것이다. 마지막으로 떠내려가는 데이터를 꺼내 특정한 변화를 시킨 뒤 관찰하는 사람(Subscriber)에게 전달해주는 사람을 Operator라 한다.

위 비유가 100% 정확한 것은 아니지만 RxJava를 처음 접하는 독자들을 위해 표현들을 엮어서 스토리를 만들어보았다. 조금 더 구체적으로 살펴보기 위해 게임 오버쿡드의 장면을 편집하여 시나리오를 만들었다. 아래 사진의 내용을 RxJava 코드로 옮겨보면서 Observable, Subscriber, 그리고 Operator를 이해해보자.

오버쿡드로 표현한 Observable, Subscriber, Operator

아래에 있는 루돌프(Subscriber)가 구독(Subscribe) 하면, 맨 위의 판다(Observable)는 갖가지 재료(데이터)를 내보낸다. 루돌프(Subscriber)는 데이터를 받으면 손님들에게 전달할 것이다. 이 과정에서 악어(Operator)는 데이터가 루돌프에게 전달되기 전에 재료를 자르고 접시에 담는다. 또 4가지 재료 중 생선과 오이만을 전달할 것이다.

이제 위 내용을 코드로 옮겨보자. 우선 판다(Observable)가 4가지 재료(다시마, 생선, 밥, 오이)를 내보내는 것은 RxJava로 다음과 같이 구현할 수 있다.

Observable panda = Observable.just("KELP", "FISH", "RICE", "CUCUMBER");

just() 함수는 인자로 넣은 데이터들을 순차적으로 내보낸다. 타입은 간단하게 String으로 지정하였다.

다음으로 루돌프(Subscriber)가 음식을 받아 손님에게 전달하는 것을 코드로 표현해보자.

Subscriber rudolph = new Subscriber() { @Override public void onNext(String food) { serveFoodToGuest(food); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } };

Observable이 데이터를 하나씩 내보낼 때마다 onNext() 함수가 호출되어 serveFoodToGuest()를 실행한다. 모든 데이터를 내보내면 onCompleted()가, 중간에 에러가 발생하면 onError()가 호출된다.

serveFoodToGuest() 함수는 다음과 같이 구현했다고 하자.

public void serveFoodToGuest(String food) { System.out.println(food); }

이제 루돌프(Subscriber)가 판다(Observable)를 구독(Subscribe)하도록 하는 RxJava 코드는 다음과 같다.

panda.subscribe(rudolph);

구독(Subscribe)을 하면 Observable이 데이터를 내보내기 시작한다. 데이터를 내보낼 때마다 onNext() 함수가 호출되고 아래와 같은 결과가 나온다.

KELP FISH RICE CUCUMBER

악어(Operator)를 추가하기 전에 지금까지의 코드를 줄여보자. onNext만 인자로 전달하고 Java 8의 lambda 표현을 적용하면 Observable을 구독(Subscribe)하는 것을 한 줄로 구현할 수 있다.

Observable.just("KELP", "FISH", "RICE", "CUCUMBER") .subscribe(food -> serveFoodToGuest(food));

악어(Operator)는 재료를 자른 후 접시에 담는다. 그리고 생선과 오이만 루돌프(Subscriber)에게 전달한다. 우선 재료를 자르고 접시에 담는 함수를 각각 정의해보자. 필터링을 위한 함수도 같이 정의할 것이다.

public String chopFood(String food) { return String.join(" ", food.split("")); } public String plateFood(String food) { return "[ " + food + " ]"; } public boolean isFishOrCucumber(String food) { return "FISH".equals(food) || "CUCUMBER".equals(food); }

데이터를 변환하는 Operator은 map()이고 데이터를 필터링하는 Operator은 filter()이다. 마지막으로 악어(들)을 추가한 코드를 작성하면 다음과 같다.

Observable.just("KELP", "FISH", "RICE", "CUCUMBER") .map(food -> chopFood(food)) .map(food -> plateFood(food)) .filter(food -> isFishOrCucumber(food)) .subscribe(food -> serveFoodToGuest(food));

완성된 코드를 실행하면 아래와 같은 결과가 나온다.

[ K E L P ] [ F I S H ] [ R I C E ] [ C U C U M B E R ]

게임 오버쿡드의 RxJava 시나리오를 완성했다. Observable, Subscriber, Operator이 무엇인지, 어떻게 구현되는지 대략적으로 이해된다면 성공이다. 실제 RxJava에는 just() 함수 외에 Observable을 생성하는 여러 방법과 수십가지의 Operators가 있다. RxJava에서 제공하는 다양한 기능들을 확인하고 싶다면 ReactiveX 공식 홈페이지를 읽어보길 권한다.

안드로이드에서 RxJava 활용하기

맛집을 찾아다니다 보면 줄을 서서 웨이팅을 해야 할 때가 있다. 그럴때면 가끔 누군가 나 대신 줄을 서고 기다리는 동안 다른 일을 하면 좋겠다는 생각을 한다. 프로그래밍에서는 새로운 Thread에 작업을 맡기고 나(Main Thread)는 다른 일을 하는게 가능하다. 네트워크 요청과 같이 시간이 오래 걸리는 일이나 특정 시점까지 기다리는 작업은 새로운 Thread를 만들어 별도로 처리하는 것이 효율적이다. 하지만 Thread의 수가 늘어나게 되면 무슨 작업을 어느 Thread에서 하고 있는지 관리가 힘들어져 충돌이 생길 수 있다. RxJava에서는 이에 대한 해결책으로 Scheduler의 개념이 등장한다.

Scheduler를 이용하면 Observable과 Subscriber의 코드를 어느 Thread에서 수행할지 지정할 수 있다. 방법은 간단하다. subscribeOn() Operator을 추가해 Observable의 코드를 어느 Thread에서 수행할지 지정하고 observeOn() Operator을 추가해 Subscriber의 코드를 어느 Thread에서 수행할지 지정한다.

Observable.just("one", "two", "three", "four", "five") .subscribeOn(schedulerA) .observeOn(schedulerB) .subscribe(s -> System.out.println(s));

위 코드에서 Observable의 작업(one부터 five까지 차례로 내보내는 작업)은 schedulerA가 지정하는 Thread에서, Subscriber의 작업(문자열을 출력)은 schedulerB가 지정하는 Thread에서 수행한다. 이것을 활용하여 안드로이드 앱에서 사진을 다운로드 받아 화면에 표시하는 RxJava 코드를 만들어보자.

myObservableService.downloadImage(url) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(bitmap -> showImage(bitmap));

**AndroidSchedulers.mainThread()는 RxAndroid의 기능이다

이미지 다운로드는 I/O Thread에서, 이미지를 화면에 보여주는 것은 Main Thread에서 수행하게 된다 (안드로이드에서 화면을 변경하는 것은 오직 Main Thread만 가능하다). Operator 2개만 추가했는데 사진을 비동기적(asynchronous)으로 가져오는 멀티쓰레드 프로그램이 완성된 것이다. 참고로 RxJava(RxAndroid)를 사용하지 않고 이미지를 비동기적으로 가져오는 코드의 일부를 적으면 다음과 같다.

private class DownloadImageTask extends AsyncTask { @Override protected void onPreExecute() { super.onPreExecute(); } @Override protected Bitmap doInBackground(URL... urls) { Bitmap image = downloadImage(urls[0]); return image; } @Override protected void onProgressUpdate(Void... values) { super.onProgressUpdate(values); } @Override protected void onPostExecute(Bitmap image) { super.onPostExecute(image); showImage(image); } }

RxJava로 네트워크 로직 Reactive하게 만들기

마지막 단계는 RxJava를 안드로이드 앱의 비동기(asynchronous) 작업에 적용하는 것이다. RxJava를 사용하려면 우선 Maven을 통해 dependency를 추가해야 한다. build.gradle 파일에 아래와 같은 선언을 추가한다.

dependencies { implementation 'io.reactivex.rxjava3:rxandroid:3.x.x' implementation 'io.reactivex.rxjava3:rxjava:3.x.x' }

네트워크 요청 라이브러리 Retrofit에는 RxJava를 적용할 수 있다. 프로젝트에 Retrofit을 적용할 때는 기능별로 Service 인터페이스를 만들어 네트워크 함수들을 정의하는 것을 추천한다. SNS에서 친구로 등록된 사용자 목록을 가져오는 상황을 가정해보았다.

1. Callback을 정의한 Retrofit 코드

@GET("/main/friends") void getFriendList(Callback> callback);

2. RxJava를 활용한 Retrofit 코드

@GET("/main/friends") Observable> getFriendList();

이제 네트워크 요청을 하고 싶을 때 함수에서 반환하는 Observable을 구독(Subscribe)하면 서버에서 데이터를 가져온다. MVC 패턴을 사용한다면 Activity나 Fragment에, MVVM 패턴을 사용한다면 ViewModel에 Observable을 구독(Subscribe)하는 코드를 넣어 비동기 통신을 구현한다.

retrofit.create(FriendService.class) .getFriendList() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull List users) { // 네트워크 통신이 성공하면 이 코드가 실행된다 addHeart(users) } @Override public void onError(@NonNull Throwable e) { Log.d("friend", "ERROR: getFriendList"); } @Override public void onComplete() { } });

마지막으로 RxJava를 활용하는 장점을 정리해보고자 한다. RxJava를 적용하면 비동기 처리 코드의 가독성을 높이고 콜백(Callback) 방식에서 발생할 수 있는 문제점(ex: 콜백 지옥)을 개선할 수 있다. 또한 RxAndroid와 함께 사용하면 명시적인 Thread 활용이 가능하고 손쉽게 비동기 이벤트를 처리할 수 있다.

간단한 사례에서는 네트워크 로직에 반응형 프로그래밍(Reactive Programming)을 적용할 때의 장점이 잘 드러나지 않는다. 위에서 열거한 장점에 더해 RxJava를 100% 활용하려면 다양한 Oprators와 Observable형을 유연하게 사용할 수 있어야 한다. RxJava를 더 풍부하게 사용하는 것을 이 글의 과제로 남기고 싶다.

from http://juan-dev.tistory.com/2 by ccl(A) rewrite - 2021-11-27 01:02:03