on
RxJava - 01
RxJava - 01
이 글은 RxJava 리액티브 프로그래밍(저자 스다 토모유키) 책을 정리한 글입니다.
책에 나와있는 Java 코드는 전부 Kotlin으로 변환하여 작성하였습니다. (RxKotlin)
책 정보: https://book.naver.com/bookdb/book_detail.nhn?bid=14689555
RxJava Chapter 01
RxJava란 무엇인가?
Java에서 리액티브 프로그래밍을 구현하는데 사용하는 라이브러리.
2.0 버전부터 Reactive Streams를 이용하여 데이터 스트림을 비동기로 처리하는 인터페이스를 제공한다.
기본 사용법 (in Kotlin)
val flowable = Flowable.just("Hello", "World") flowable.subscribe { println(it) }
출력 결과
Hello World
just 메소드 인자로 전달된 데이터를 통지하는 생산자 (Flowable)을 생성
생산자가 데이터를 통지하면 소비자가 받은 데이터를 출력
위처럼 데이터를 통지하는 생산자를 생성하고 해당 데이터를 소비자가 받아서 처리하는 방식이 RxJava의 기본 방식이다.
리액티브 프로그래밍(Reactive Programming)
데이터가 통지 될 때 마다 반응하여 데이터를 처리하는 방식.
리액티브 프로그래밍에서 데이터를 생산하는측은 데이터를 소비하는 측과 분리되어 로직을 알 필요가 없기 때문에, 데이터를 전달하고 바로 다음 데이터를 처리할 수 있어서 비동기 처리를 쉽게 구현할 수 있다.
RxJava 특징
감시 대상 객체의 상태가 변하면 이를 관찰하는 객체에 알려주는 Observer패턴 사용
쉬운 비동기 처리, 스레드 관리의 자유로움
함수형 프로그래밍의 영향을 받아, 입력과 결과만 정해져 있다면 구체적 처리는 자유로이 구현할 수 있음
Reactive Streams
라이브러리나 프레임워크에 관계 없이 데이터 스트림을 비동기로 다룰 수 있는 메커니즘
Reactive Streams는 데이터를 생산하고 통지하는 Publisher(생산자)와 해당 데이터를 구독하여 처리하는 Subscriber(소비자)로 구성된다.
(생산자와 소비자의 데이터 처리 메커니즘)
Reactive Streams 규칙
구독 시작 통지(onSubscribe)는 한 구독에서 한번만 발생한다.
통지는 순차적으로 이루어진다.
null은 통지하지 않는다.
Publisher의 처리는 완료(onComplete) 또는 에러(onError)를 통지해서 종료시킨다.
추가로 데이터 개수 요청, 구독 해지를 수행하는 Subscription은 다음과 같은 규칙이 존재한다.
데이터 개수 요청에 Long.MAX_VALUE를 설정하면 데이터 개수에 의한 통지제한이 사라진다. 즉 데이터 개수를 요청하지 않아도 데이터 통지를 계속해서 받는다.
Subscription 메소드는 동기화 상태로 호출해야 한다. 즉 Subscription 메소드를 동시에 호출할 수 없다.
RxJava의 기본 구조
Reactive Streams를 지원하는 Flowable/Subscriber
Reactive Streams를 지원하지 않는 Observable/Observer
Observable과 Observer는 Reacitve Streams 인터페이스를 사용하지 않지만, onSubscribe, onNext, onError, onComplete의 구성은 동일하다. 단 데이터 개수를 제한하는 기능으로 Subscription 대신 Disposable 인터페이스를 사용한다.
연산자
생산자가 통지한 데이터가 소비자에게 도착하기 전에 데이터를 가공하는 메서드들.
이 연산자들을 연결(Method Chain)하여 최종 통지 데이터의 처리를 단계적으로 구성할 수 있다.
연산자 예시
val flowable = Flowable .just(1,2,3,4,5,6,7,8,9,10) .filter { it % 2 == 0 } // 짝수 데이터만 통지한다. .map { it * 100 } // 데이터를 100배로 반환한다. flowable.subscribe{ println("data = $it") }
출력 결과
data = 200 data = 400 data = 600 data = 800 data = 1000
비동기 처리
Rxjava에서는 개발자가 직접 스레드를 관리하지 않게 스케줄러(Scheduler)를 제공하며, 이를 이용하여 어떤 스레드에서 무엇을 처리할지 제어할 수 있다.
비동기 처리시 생산자와 소비자는 데이터 통지시에만 데이터를 주고 받아야하며, 그 외의 요인으로 서로에 영향을 주지 않아야 한다.
외부 영향을 받는 예시
var calcMethod = State.ADD val flowable = Flowable .interval(300L, TimeUnit.MILLISECONDS) .take(7) .scan { sum, data -> if(calcMethod == State.ADD) { return@scan sum + data } else { return@scan sum * data } } flowable.subscribe{ println("data = $it") } Thread.sleep(1000) println("계산 방법 변경") calcMethod = State.MULTIPLY Thread.sleep(2000)
출력 결과
data = 0 data = 1 data = 3 계산 방법 변경 data = 9 data = 36 data = 180 data = 1080
Cold 생산자와 Hot 생산자
Cold생산자는 1개의 소비자와 구독 (Default)
Hot생산자는 여러 소비자와 구독가능
Hot 생산자는 이미 처리를 시작한 생산자를 구독하면 구독 시점부터 처리되고 있는 데이터를 받을 수 있고, 같은 데이터를 여러 소비자가 받을 수 있다.
마블 다이어그램
리액티브 프로그래밍에서 시간 경과에 따라 데이터가 전달되고 변화하는 과정을 표현한 그림
Rxjava 예제
Flowable & Subscriber
fun main() { //Floawable 생성 val flowable = Flowable.create(object : FlowableOnSubscribe { @Throws(Exception::class) override fun subscribe(emitter: FlowableEmitter) { val datas = listOf("Hello, World", "안녕, RxJava!") for (data in datas) { if (emitter.isCancelled) return //구독이 해지되면 처리 중단 emitter.onNext(data) //데이터 통지 } emitter.onComplete() } }, BackpressureStrategy.BUFFER) // 초과데이터 버퍼링 //Subscriber 처리를 개별 스레드에서 진행 flowable.observeOn(Schedulers.computation()) .subscribe(object : Subscriber { //데이터 개수 요청과 구독 해지하는 객체 private var subscription: Subscription? = null override fun onSubscribe(subscription: Subscription?) { this.subscription = subscription this.subscription?.request(1L) // 받을 데이터 개수 요청 } override fun onNext(data: String?) { val threadName = Thread.currentThread().name println("$threadName : $data") this.subscription?.request(1L) } override fun onComplete() { val threadName = Thread.currentThread().name println("$threadName : 완료") } override fun onError(error: Throwable?) { error?.printStackTrace() } }) Thread.sleep(500L) }
Observable & Observer
... emitter.onComplete() } }) // BackpressureStrategy 인자 없이, 데이터가 생성될 때 마다 바로 통지한다. ... override fun onNext(data: String?) { val threadName = Thread.currentThread().name println("$threadName : $data") // request 메서드가 없다 (데이터 개수를 요청하지 않는다.) }
출력 결과
RxComputationThreadPool-1 : Hello, World RxComputationThreadPool-1 : 안녕, RxJava! RxComputationThreadPool-1 : 완료
구독 중도해지 예제
fun main() { Flowable.interval(200L, TimeUnit.MILLISECONDS) .subscribe(object : Subscriber{ private var subscription : Subscription? = null private var startTime : Long = 0 override fun onSubscribe(subscription: Subscription?) { this.subscription = subscription this.startTime = System.currentTimeMillis() this.subscription?.request(Long.MAX_VALUE) } override fun onNext(data: Long?) { // 구독 시작부터 500밀리초가 지나면 구독을 해지하고 처리 중단 if ((System.currentTimeMillis() - startTime > 500)) { this.subscription?.cancel() // 구독 해지 println("구독 해지") return } println("data = $data") } override fun onError(t: Throwable?) { // } override fun onComplete() { // } }) Thread.sleep(2000L) }
출력 결과
data = 0 data = 1 구독 해지
onSubscribe 메서드에서 Subscription을 전달받고 요청을 호출해 통지를 시작한다
onNext 메서드로 200밀리초 후 데이터 0을 받는다.
onNext 메서드에서 구독을 시작하고 500밀리초가 넘었는지 확인한다.
500밀리초가 지나지 않았으므로(200) 다시 onNext 메서드에서 데이터를 출력한다.
400밀리초 후에 데이터 1을 받는다
구독이 500밀리초가 넘었는지 확인한다.
500밀리초가 넘지 않았으므로(400) 데이터를 출력한다.
600밀리초 후에 데이터 2를 받는다.
구독이 500밀리초가 넘었으므로 구독을 해지한다.
단순히 500밀리초가 경과하면 구독을 해지하는것이 아니라, onNext가 호출된 시점에서 500밀리초가 지났는지를 체크한다. 즉 onNext가 호출되지 않으면 500밀리초가 지나도 구독이 해지되지 않는다.
observeOn
RxJava에서는 데이터를 통지하는 측과 전달받는 측의 처리를 별도의 스레드에서 실행할 때, Scheduler객체를 observeOn 의 인자로 설정해 어떤 스레드에서 실행할 지 지정할 수 있다. (Android에서 UI 스레드와 데이터 처리등에 사용하는 IO 스레드를 구분하여 사용 가능)
subscribe
Publisher의 subscribe 메서드는 publisher와 subscriber 사이의 상호 작용이 외부로부터 영향을 받지 않기 위해 return값이 존재하지 않는다.
onSubscribe
RxJava 2.0.7 이후부터는 반드시 onSubscribe 메서드가 끝난 뒤에 onNext 메서드가 실행된다.
Single / Maybe / Completable
생산자가 되는 Flowable, Observable 외에도 Single, Maybe, Completable이라는 클래스를 제공한다
Single : 통지할 데이터가 반드시 1건일 때
Maybe : 데이터가 없거나 있다면 반드시 1건일 때
Completable : 데이터 없이 완료만 통지
또한 각각 SingleObserver, MaybeObserver, CompletableObserver라는 독자적인 소비자를 사용한다.
Single
데이터를 1건만 통지하거나 에러를 통지
데이터 통지가 곧 완료 통지이므로 별도의 완료 통지 안함
onNext와 onComplete 대신 onSuccess 제공
Maybe
데이터를 1건만 통지하거나, 1건도 통지않거나 에러 통지
데이터가 1건도 없이 정상종료 될 때만 onComplete 호출
onNext 대신 onSuccess 제공
Completable
데이터를 통지하지 않거나 에러 통지
Completable 내에서 특정 부가 작용이 발생하는 처리 수행
해당 처리가 끝나면 onComplete, 에러시 onError 호출
따라서 부가 작용 처리의 구독을 호출하는 스레드와 동일한 스레드에서 실행하면 사용의 의미가 없음
from http://seoplee.tistory.com/25 by ccl(A) rewrite - 2021-12-28 06:28:27