callbackFlow로 callback api를 flow로 전환하기

4분 읽기
#kotlin-coroutine#flow

안드로이드 개발을 하다 보면 위치 정보(Location), 텍스트 변경(TextWatcher), 센서 데이터 등과 같이 시스템이나 외부 라이브러리가 콜백(Callback)으로 던져주는 데이터를 처리해야 할 때가 있다.

이런 비동기 콜백 데이터를 코틀린의 Flow 파이프라인으로 가져오기 위해 사용하는 것이 callbackFlow다.

Flow와 callbackFlow의 가장 큰 차이는 "데이터를 누가 생산하는가"이다. flow는 능동, callbackFlow는 수동이라고 개념을 잡자.

val simpleFlow = flow {
    for (i in 1..3) {
        delay(1000) // 원할 때 멈출(suspend) 수 있음
        emit(i)     // 직접 배달 (suspend)
    }
}

flow는 개발자가 정의한 로직에 따라 데이터를 순차적으로 만들어서 내보낸다.

// 외부(리스너)가 데이터를 줄 때까지 기다림
val locationFlow = callbackFlow {
    val listener = object : LocationListener {
        override fun onLocationUpdate(location: String) {
            // emit(location) -> ❌ 호출 불가 (Suspend 함수가 아니기 때문)
            trySend(location) // ✅ OK (비동기 채널에 던져넣기)
        }
    }
}

반대로 callbackFlow는 데이터가 언제 올지 모른다. 외부(시스템, UI, 라이브러리)에서 콜백을 호출해줄 때, 데이터를 받아와야 된다.

callbackFlow 구조

리스너 등록 - 데이터 전달 - 종료 처리로 구성된다.

특히 종료처리는 awaitClose로 되는데, flow가 닫힐 때 리스너를 해제하도록 하고, flow를 구독중일 때 suspend를 먹여 callback을 받는 flow가 닫히지 않도록 해주는 역할을 한다.

fun getLocationFlow(): Flow<String> = callbackFlow {
    // 1. 리스너 정의
    val listener = object : LocationListener {
        override fun onLocationUpdate(location: String) {
            // 2. 데이터가 들어오면 Flow 파이프라인으로 전송
            trySend(location) 
        }
    }

    // 3. 리스너 등록 (데이터 수신 시작)
    locationManager.registerListener(listener)

    // 4. 종료 대기 및 청소
    // Flow 수집이 끝날 때까지 여기서 대기(Suspend)하다가, 종료 신호가 오면 블록 내부를 실행함
    awaitClose {
        locationManager.unregisterListener(listener)
    }
}

awaitClose는 suspendCancellableCoroutine으로 되어있다. awaitClose를 안달아두면 코드 실행이 다 됐다고 판단해서, 데이터를 trySend에서 보낼 flow가 죽어버린다.

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
    check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" }
    try {
        suspendCancellableCoroutine<Unit> { cont ->
            invokeOnClose {
                cont.resume(Unit)
            }
        }
    } finally {
        block()
    }
}

suspendCancellableCoroutine은 현재 코루틴을 '일시 중지(Suspend)' 시키고 누군가 cont.resume()을 호출해서 깨워줄 때까지 멈춰 세우는데, 채널이 취소되거나 닫히면 cont.resume이 호출돼서 awaitClose가 실행되어 callbackFlow도 종료된다.

왜 emit이 아니라 trySend인가?

public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block)
public fun trySend(element: E): ChannelResult<Unit>
public suspend fun emit(value: T)

일단 emit은 suspend 함수고, trySend는 일반 함수다. 콜백함수는 suspend가 아니기 때문에, emit을 사용하기 부적절하다는 점이 첫번째로 알고 들어가야하는 부분이다.

callbackFlow는 내부적으로 채널(Channel)이라는 버퍼(Queue)를 가지고 있다. 컨베이어 벨트를 생각하면 된다.

  • emit() (Suspend 함수)

받는 사람이 가져갈 때까지 벨트 앞에서 기다린다. 콜백 함수(onLocationUpdate)는 시스템이 호출하는 일반 함수인데, 여기서 코드 실행을 멈추고(Suspend) 기다리면 앱이 버벅이거나 ANR이 발생해 크래시가 날 수 있다.

  • trySend() (일반 함수)

일단 벨트 위에 물건을 툭 던져놓고 바로 자기 할 일을 하러 간다. 즉, 기다리지 않고(Non-blocking) 채널에 자리가 있으면 성공(Success), 꽉 찼으면 실패(Failure)를 반환한다.

trySend는 데이터를 내부 Channel/Buffer에 집어넣는 역할을 하며, collect하는 쪽에서 이 데이터를 하나씩 꺼내간다. 우리가 아는 flow로 바꿔주는 것이다.

callbackFlow는 cold-flow이다.

callbackFlow는 기본적으로 Cold Flow다. 이는 "구독자(Collector)가 생길 때마다 코드를 처음부터 다시 실행한다"는 뜻으로 코드 로직에 따라 매번 인스턴스를 생성하게 될 수도 있다는 것이다.

val locationFlow = repository.getLocationFlow() // Cold Flow

// 화면 A에서 구독 -> 리스너 등록 1
lifecycleScope.launch { locationFlow.collect { ... } } 

// 화면 B에서 구독 -> 리스너 등록 2 -> 이미 있는 리스너를 재사용하지않음
lifecycleScope.launch { locationFlow.collect { ... } }

동일한 역할을 하는 리스너를 2개나 등록하는 것은 자원 낭비다. 이를 해결하기 위해 Hot Flow(SharedFlow, StateFlow)로 변환해주는 작업이 필요하다.

shareIn, stateIn으로 간단하게 바꿀 수 있다.

val sharedFlow = rawFlow.shareIn(
    scope = viewModelScope,
    started = SharingStarted.WhileSubscribed(5000), // 5초 여유
    replay = 1 // 신규 구독자에게 최근 데이터 1개 재발송
)

shareIn은 SharedFlow를 생각하면 된다. 초기값이 필요 없고, 이벤트가 발생했을 때만 전파한다. 일회성 작업에 적절하다.

val stateFlow = rawFlow.stateIn(
    scope = viewModelScope,
    started = SharingStarted.WhileSubscribed(5000),
    initialValue = "위치 확인 중..."
)

stateIn은 StateFlow에 적절하다. 초기값이 필수이며, 항상 현재의 상태를 가지고 있다.

WhileSubscribed(5000)의 의미는 구독자가 사라져도 5초간은 연결을 끊지 말고 기다리라는 의미다. 구성변경이 발생하면 액티비티가 파괴되었다가 재생성되는데, 그 때 잠시 구독자가 0명이 되어 리스너에 완전히 새로 연결되는 경우가 생긴다.

이 때 생기는 비효율을 막기위해 WhileSubscribed 옵션을 넣어준다.

Comments