Refactored RxJava to Coroutines in LanguageViewModel.

* Removed duplicate method in `DownloadRoomDao` for fetching all downloads.
* Refactored `DownloadManagerMonitor` to use `Coroutines` instead of `RxJava`.
* Improved coroutine usage in `DownloadMonitorService`.
* Updated `LanguageViewModel` to use `Coroutine Flows` instead of `RxJava`.
* Refactored `NewLanguagesDao` to expose Coroutine Flows instead of `RxJava` observables.
* Updated `SaveLanguagesAndFinish` to use coroutines instead of `RxJava`.
* Created a `FlowExtension` utility class to add custom flow-related extensions. In this PR, added the `collectSideEffectWithActivity` extension function, which allows collecting `SideEffects` and handling them with the current Activity in a Compose UI. This also moved the `SideEffect` collection from the Fragment layer to the Compose screen, improving separation of concerns.
* Refactored the unit test cases according to this change.
This commit is contained in:
MohitMaliFtechiz 2025-05-14 19:34:13 +05:30
parent 6351b60755
commit 163dfd3844
13 changed files with 292 additions and 197 deletions

View File

@ -71,7 +71,7 @@ class LanguageFragment : BaseFragment() {
fun resetSearchState() {
// clears the search text and resets the filter
searchText = ""
languageViewModel.actions.offer(Action.Filter(searchText))
languageViewModel.actions.tryEmit(Action.Filter(searchText))
}
KiwixTheme {
@ -83,13 +83,13 @@ class LanguageFragment : BaseFragment() {
isSearchActive = isSearchActive,
onSearchClick = { isSearchActive = true },
onSaveClick = {
languageViewModel.actions.offer(Action.SaveAll)
languageViewModel.actions.tryEmit(Action.SaveAll)
}
),
onClearClick = { resetSearchState() },
onAppBarValueChange = {
searchText = it
languageViewModel.actions.offer(Action.Filter(it))
languageViewModel.actions.tryEmit(Action.Filter(it))
},
navigationIcon = {
NavigationIcon(
@ -113,18 +113,6 @@ class LanguageFragment : BaseFragment() {
)
}
}
compositeAdd(activity)
}
private fun compositeAdd(activity: CoreMainActivity) {
compositeDisposable.add(
languageViewModel.effects.subscribe(
{
it.invokeWith(activity)
},
Throwable::printStackTrace
)
)
}
private fun appBarActionMenuList(

View File

@ -30,14 +30,15 @@ import androidx.compose.foundation.lazy.rememberLazyListState
import androidx.compose.material3.ExperimentalMaterial3Api
import androidx.compose.material3.Scaffold
import androidx.compose.runtime.Composable
import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.getValue
import androidx.compose.runtime.livedata.observeAsState
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.platform.LocalContext
import androidx.compose.ui.platform.LocalLayoutDirection
import androidx.compose.ui.unit.dp
import org.kiwix.kiwixmobile.core.R
import org.kiwix.kiwixmobile.core.extensions.CollectSideEffectWithActivity
import org.kiwix.kiwixmobile.core.ui.components.ContentLoadingProgressBar
import org.kiwix.kiwixmobile.core.ui.components.KiwixAppBar
import org.kiwix.kiwixmobile.core.ui.components.KiwixSearchView
@ -60,10 +61,12 @@ fun LanguageScreen(
onAppBarValueChange: (String) -> Unit,
navigationIcon: @Composable() () -> Unit = {}
) {
val state by languageViewModel.state.observeAsState(State.Loading)
val state by languageViewModel.state.collectAsState(State.Loading)
val listState: LazyListState = rememberLazyListState()
val context = LocalContext.current
languageViewModel.effects.CollectSideEffectWithActivity { effect, activity ->
effect.invokeWith(activity)
}
Scaffold(topBar = {
KiwixAppBar(
titleId = R.string.select_languages,
@ -106,7 +109,7 @@ fun LanguageScreen(
context = context,
listState = listState,
selectLanguageItem = { languageItem ->
languageViewModel.actions.offer(Action.Select(languageItem))
languageViewModel.actions.tryEmit(Action.Select(languageItem))
}
)
}

View File

@ -18,13 +18,18 @@
package org.kiwix.kiwixmobile.language.viewmodel
import org.kiwix.kiwixmobile.language.composables.LanguageListItem.LanguageItem
import androidx.lifecycle.MutableLiveData
import androidx.lifecycle.ViewModel
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.processors.PublishProcessor
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import org.kiwix.kiwixmobile.core.base.SideEffect
import org.kiwix.kiwixmobile.core.dao.NewLanguagesDao
import org.kiwix.kiwixmobile.language.composables.LanguageListItem.LanguageItem
import org.kiwix.kiwixmobile.language.viewmodel.Action.Filter
import org.kiwix.kiwixmobile.language.viewmodel.Action.SaveAll
import org.kiwix.kiwixmobile.language.viewmodel.Action.Select
@ -37,27 +42,40 @@ import javax.inject.Inject
class LanguageViewModel @Inject constructor(
private val languageDao: NewLanguagesDao
) : ViewModel() {
val state = MutableLiveData<State>().apply { value = Loading }
val actions = PublishProcessor.create<Action>()
val effects = PublishProcessor.create<SideEffect<*>>()
private val compositeDisposable = CompositeDisposable()
val state = MutableStateFlow<State>(Loading)
val actions = MutableSharedFlow<Action>(extraBufferCapacity = Int.MAX_VALUE)
val effects = MutableSharedFlow<SideEffect<*>>(extraBufferCapacity = Int.MAX_VALUE)
private val coroutineJobs = mutableListOf<Job>()
init {
compositeDisposable.addAll(
actions.map { state.value?.let { value -> reduce(it, value) } }
.distinctUntilChanged()
.subscribe(state::postValue, Throwable::printStackTrace),
languageDao.languages().filter { it.isNotEmpty() }
.subscribe(
{ actions.offer(UpdateLanguages(it)) },
Throwable::printStackTrace
)
)
coroutineJobs.apply {
add(observeActions())
add(observeLanguages())
}
}
private fun observeActions() =
viewModelScope.launch {
actions
.map { action -> reduce(action, state.value) }
.distinctUntilChanged()
.collect { newState -> state.value = newState }
}
private fun observeLanguages() =
viewModelScope.launch {
languageDao.languages()
.filter { it.isNotEmpty() }
.collect { languages ->
actions.tryEmit(UpdateLanguages(languages))
}
}
override fun onCleared() {
compositeDisposable.clear()
coroutineJobs.forEach {
it.cancel()
}
coroutineJobs.clear()
super.onCleared()
}
@ -71,17 +89,20 @@ class LanguageViewModel @Inject constructor(
Loading -> Content(action.languages)
else -> currentState
}
is Filter -> {
when (currentState) {
is Content -> filterContent(action.filter, currentState)
else -> currentState
}
}
is Select ->
when (currentState) {
is Content -> updateSelection(action.language, currentState)
else -> currentState
}
SaveAll ->
when (currentState) {
is Content -> saveAll(currentState)
@ -91,10 +112,11 @@ class LanguageViewModel @Inject constructor(
}
private fun saveAll(currentState: Content): State {
effects.offer(
effects.tryEmit(
SaveLanguagesAndFinish(
currentState.items,
languageDao
languageDao,
viewModelScope
)
)
return Saving

View File

@ -18,24 +18,30 @@
package org.kiwix.kiwixmobile.language.viewmodel
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.Flowable
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.kiwix.kiwixmobile.core.base.SideEffect
import org.kiwix.kiwixmobile.core.dao.NewLanguagesDao
import org.kiwix.kiwixmobile.core.zim_manager.Language
@Suppress("IgnoredReturnValue", "CheckResult")
@Suppress("InjectDispatcher", "TooGenericExceptionCaught")
data class SaveLanguagesAndFinish(
val languages: List<Language>,
val languageDao: NewLanguagesDao
private val languages: List<Language>,
private val languageDao: NewLanguagesDao,
private val lifecycleScope: CoroutineScope
) : SideEffect<Unit> {
override fun invokeWith(activity: AppCompatActivity) {
Flowable.fromCallable { languageDao.insert(languages) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
lifecycleScope.launch {
try {
withContext(Dispatchers.IO) {
languageDao.insert(languages)
}
activity.onBackPressedDispatcher.onBackPressed()
}, Throwable::printStackTrace)
} catch (e: Throwable) {
e.printStackTrace()
}
}
}
}

View File

@ -285,7 +285,7 @@ class ZimManageViewModel @Inject constructor(
val downloads = downloadDao.downloads().asFlowable()
val booksFromDao = books().asFlowable()
val networkLibrary = PublishProcessor.create<LibraryNetworkEntity>()
val languages = languageDao.languages()
val languages = languageDao.languages().asFlowable()
return arrayOf(
updateLibraryItems(booksFromDao, downloads, networkLibrary, languages),
updateLanguagesInDao(networkLibrary, languages),
@ -451,7 +451,7 @@ class ZimManageViewModel @Inject constructor(
booksFromDao: io.reactivex.rxjava3.core.Flowable<List<BookOnDisk>>,
downloads: io.reactivex.rxjava3.core.Flowable<List<DownloadModel>>,
library: Flowable<LibraryNetworkEntity>,
languages: Flowable<List<Language>>
languages: io.reactivex.rxjava3.core.Flowable<List<Language>>
) = Flowable.combineLatest(
booksFromDao,
downloads,
@ -481,7 +481,7 @@ class ZimManageViewModel @Inject constructor(
private fun updateLanguagesInDao(
library: Flowable<LibraryNetworkEntity>,
languages: Flowable<List<Language>>
languages: io.reactivex.rxjava3.core.Flowable<List<Language>>
) = library
.subscribeOn(Schedulers.io())
.map(LibraryNetworkEntity::book)

View File

@ -18,13 +18,13 @@
package org.kiwix.kiwixmobile.language.viewmodel
import com.jraska.livedata.test
import androidx.lifecycle.viewModelScope
import io.mockk.clearAllMocks
import io.mockk.every
import io.mockk.mockk
import io.reactivex.processors.PublishProcessor
import io.reactivex.schedulers.Schedulers
import org.junit.jupiter.api.AfterAll
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.runTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
@ -37,130 +37,169 @@ import org.kiwix.kiwixmobile.language.viewmodel.Action.Select
import org.kiwix.kiwixmobile.language.viewmodel.Action.UpdateLanguages
import org.kiwix.kiwixmobile.language.viewmodel.State.Content
import org.kiwix.kiwixmobile.language.viewmodel.State.Loading
import org.kiwix.kiwixmobile.language.viewmodel.State.Saving
import org.kiwix.kiwixmobile.zimManager.testFlow
import org.kiwix.sharedFunctions.InstantExecutorExtension
import org.kiwix.sharedFunctions.language
import org.kiwix.sharedFunctions.resetSchedulers
import org.kiwix.sharedFunctions.setScheduler
fun languageItem(language: Language = language()) =
LanguageListItem.LanguageItem(language)
@ExtendWith(InstantExecutorExtension::class)
class LanguageViewModelTest {
init {
setScheduler(Schedulers.trampoline())
}
@AfterAll
fun teardown() {
resetSchedulers()
}
private val newLanguagesDao: NewLanguagesDao = mockk()
private lateinit var languageViewModel: LanguageViewModel
private val languages: PublishProcessor<List<Language>> = PublishProcessor.create()
private lateinit var languages: MutableStateFlow<List<Language>>
@BeforeEach
fun init() {
clearAllMocks()
languages = MutableStateFlow(emptyList())
every { newLanguagesDao.languages() } returns languages
languageViewModel =
LanguageViewModel(newLanguagesDao)
}
@Test
fun `initial state is Loading`() {
languageViewModel.state.test()
.assertValueHistory(Loading)
fun `initial state is Loading`() = runTest {
testFlow(
flow = languageViewModel.state,
triggerAction = {},
assert = { assertThat(awaitItem()).isEqualTo(Loading) }
)
}
@Test
fun `an empty languages emission does not send update action`() {
languageViewModel.actions.test()
.also {
languages.offer(listOf())
}
.assertValues()
fun `an empty languages emission does not send update action`() = runTest {
testFlow(
languageViewModel.actions,
triggerAction = { languages.emit(listOf()) },
assert = { expectNoEvents() }
)
}
@Test
fun `a languages emission sends update action`() {
fun `a languages emission sends update action`() = runTest {
val expectedList = listOf(language())
languageViewModel.actions.test()
.also {
languages.offer(expectedList)
testFlow(
languageViewModel.actions,
triggerAction = { languages.emit(expectedList) },
assert = {
assertThat(awaitItem()).isEqualTo(UpdateLanguages(expectedList))
}
.assertValues(UpdateLanguages(expectedList))
)
}
@Test
fun `UpdateLanguages Action changes state to Content when Loading`() {
languageViewModel.actions.offer(UpdateLanguages(listOf()))
languageViewModel.state.test()
.assertValueHistory(Content(listOf()))
}
@Test
fun `UpdateLanguages Action has no effect on other states`() {
languageViewModel.actions.offer(UpdateLanguages(listOf()))
languageViewModel.actions.offer(UpdateLanguages(listOf()))
languageViewModel.state.test()
.assertValueHistory(Content(listOf()))
}
@Test
fun `Filter Action updates Content state `() {
languageViewModel.actions.offer(UpdateLanguages(listOf()))
languageViewModel.actions.offer(Filter("filter"))
languageViewModel.state.test()
.assertValueHistory(Content(listOf(), filter = "filter"))
}
@Test
fun `Filter Action has no effect on other states`() {
languageViewModel.actions.offer(Filter(""))
languageViewModel.state.test()
.assertValueHistory(Loading)
}
@Test
fun `Select Action updates Content state`() {
languageViewModel.actions.offer(UpdateLanguages(listOf(language())))
languageViewModel.actions.offer(Select(languageItem()))
languageViewModel.state.test()
.assertValueHistory(Content(listOf(language(isActive = true))))
}
@Test
fun `Select Action has no effect on other states`() {
languageViewModel.actions.offer(Select(languageItem()))
languageViewModel.state.test()
.assertValueHistory(Loading)
}
@Test
fun `SaveAll changes Content to Saving with SideEffect SaveLanguagesAndFinish`() {
languageViewModel.actions.offer(UpdateLanguages(listOf()))
languageViewModel.effects.test()
.also {
languageViewModel.actions.offer(SaveAll)
fun `UpdateLanguages Action changes state to Content when Loading`() = runTest {
testFlow(
languageViewModel.state,
triggerAction = { languageViewModel.actions.emit(UpdateLanguages(listOf())) },
assert = {
assertThat(awaitItem()).isEqualTo(Loading)
assertThat(awaitItem()).isEqualTo(Content(listOf()))
}
.assertValues(
SaveLanguagesAndFinish(
listOf(),
newLanguagesDao
)
}
@Test
fun `UpdateLanguages Action has no effect on other states`() = runTest {
testFlow(
languageViewModel.state,
triggerAction = {
languageViewModel.actions.emit(UpdateLanguages(listOf()))
languageViewModel.actions.emit(UpdateLanguages(listOf()))
},
assert = {
assertThat(awaitItem()).isEqualTo(Loading)
assertThat(awaitItem()).isEqualTo(Content(listOf()))
}
)
}
@Test
fun `Filter Action updates Content state `() = runTest {
testFlow(
languageViewModel.state,
triggerAction = {
languageViewModel.actions.tryEmit(UpdateLanguages(listOf()))
languageViewModel.actions.tryEmit(Filter("filter"))
},
assert = {
assertThat(awaitItem()).isEqualTo(Loading)
assertThat(awaitItem()).isEqualTo(Content(items = listOf(), filter = ""))
assertThat(awaitItem()).isEqualTo(Content(listOf(), filter = "filter"))
}
)
}
@Test
fun `Filter Action has no effect on other states`() = runTest {
testFlow(
languageViewModel.state,
triggerAction = { languageViewModel.actions.emit(Filter("")) },
assert = {
assertThat(awaitItem()).isEqualTo(Loading)
}
)
}
@Test
fun `Select Action updates Content state`() = runTest {
testFlow(
languageViewModel.state,
triggerAction = {
languageViewModel.actions.emit(UpdateLanguages(listOf(language())))
languageViewModel.actions.emit(Select(languageItem()))
},
assert = {
assertThat(awaitItem()).isEqualTo(Loading)
assertThat(awaitItem()).isEqualTo(Content(listOf(language())))
assertThat(awaitItem()).isEqualTo(Content(listOf(language(isActive = true))))
}
)
}
@Test
fun `Select Action has no effect on other states`() = runTest {
testFlow(
languageViewModel.state,
triggerAction = { languageViewModel.actions.emit(Select(languageItem())) },
assert = {
assertThat(awaitItem()).isEqualTo(Loading)
}
)
}
@Test
fun `SaveAll changes Content to Saving with SideEffect SaveLanguagesAndFinish`() = runTest {
val languages = listOf<Language>()
testFlow(
flow = languageViewModel.effects,
triggerAction = {
languageViewModel.actions.emit(UpdateLanguages(languages))
languageViewModel.actions.emit(SaveAll)
},
assert = {
assertThat(awaitItem()).isEqualTo(
SaveLanguagesAndFinish(
languages,
newLanguagesDao,
languageViewModel.viewModelScope
)
)
)
languageViewModel.state.test()
.assertValueHistory(Saving)
}
)
testFlow(flow = languageViewModel.state, triggerAction = {}, assert = {
assertThat(awaitItem()).isEqualTo(State.Saving)
})
}
@Test
fun `SaveAll has no effect on other states`() {
languageViewModel.actions.offer(SaveAll)
languageViewModel.state.test()
.assertValueHistory(Loading)
fun `SaveAll has no effect on other states`() = runTest {
testFlow(
languageViewModel.state,
triggerAction = { languageViewModel.actions.emit(SaveAll) },
{ assertThat(awaitItem()).isEqualTo(Loading) }
)
}
}

View File

@ -20,31 +20,30 @@ package org.kiwix.kiwixmobile.language.viewmodel
import androidx.activity.OnBackPressedDispatcher
import androidx.appcompat.app.AppCompatActivity
import io.mockk.coEvery
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import org.kiwix.kiwixmobile.core.dao.NewLanguagesDao
import org.kiwix.kiwixmobile.core.zim_manager.Language
import org.kiwix.sharedFunctions.resetSchedulers
import org.kiwix.sharedFunctions.setScheduler
class SaveLanguagesAndFinishTest {
@Test
fun `invoke saves and finishes`() {
setScheduler(Schedulers.trampoline())
fun `invoke saves and finishes`() = runTest {
val languageDao = mockk<NewLanguagesDao>()
val activity = mockk<AppCompatActivity>()
val lifeCycleScope = TestScope(testScheduler)
val onBackPressedDispatcher = mockk<OnBackPressedDispatcher>()
every { activity.onBackPressedDispatcher } returns onBackPressedDispatcher
every { onBackPressedDispatcher.onBackPressed() } answers { }
val languages = listOf<Language>()
SaveLanguagesAndFinish(languages, languageDao).invokeWith(activity)
verify {
languageDao.insert(languages)
onBackPressedDispatcher.onBackPressed()
}
resetSchedulers()
SaveLanguagesAndFinish(languages, languageDao, lifeCycleScope).invokeWith(activity)
testScheduler.advanceUntilIdle()
coEvery { languageDao.insert(languages) }
testScheduler.advanceUntilIdle()
verify { onBackPressedDispatcher.onBackPressed() }
}
}

View File

@ -121,7 +121,7 @@ class ZimManageViewModelTest {
private val downloads = MutableStateFlow<List<DownloadModel>>(emptyList())
private val booksOnFileSystem = MutableStateFlow<List<BookOnDisk>>(emptyList())
private val books = MutableStateFlow<List<BookOnDisk>>(emptyList())
private val languages: PublishProcessor<List<Language>> = PublishProcessor.create()
private val languages = MutableStateFlow<List<Language>>(emptyList())
private val fileSystemStates: BehaviorProcessor<FileSystemState> = BehaviorProcessor.create()
private val networkStates: PublishProcessor<NetworkState> = PublishProcessor.create()
private val booksOnDiskListItems = MutableStateFlow<List<BooksOnDiskListItem>>(emptyList())
@ -382,7 +382,7 @@ class ZimManageViewModelTest {
every { application.getString(any(), any()) } returns ""
every { kiwixService.library } returns Single.just(libraryNetworkEntity(networkBooks))
every { defaultLanguageProvider.provide() } returns defaultLanguage
languages.onNext(dbBooks)
languages.value = dbBooks
testScheduler.triggerActions()
networkStates.onNext(CONNECTED)
testScheduler.triggerActions()
@ -420,12 +420,11 @@ class ZimManageViewModelTest {
networkStates.onNext(CONNECTED)
downloads.value = listOf(downloadModel(book = bookDownloading))
books.value = listOf(bookOnDisk(book = bookAlreadyOnDisk))
languages.onNext(
languages.value =
listOf(
language(isActive = true, occurencesOfLanguage = 1, languageCode = "activeLanguage"),
language(isActive = false, occurencesOfLanguage = 1, languageCode = "inactiveLanguage")
)
)
fileSystemStates.onNext(CanWrite4GbFile)
testScheduler.advanceTimeBy(500, MILLISECONDS)
testScheduler.triggerActions()
@ -458,11 +457,10 @@ class ZimManageViewModelTest {
networkStates.onNext(CONNECTED)
downloads.value = listOf()
books.value = listOf()
languages.onNext(
languages.value =
listOf(
language(isActive = true, occurencesOfLanguage = 1, languageCode = "activeLanguage")
)
)
fileSystemStates.onNext(CannotWrite4GbFile)
testScheduler.advanceTimeBy(500, MILLISECONDS)
testScheduler.triggerActions()

View File

@ -42,14 +42,11 @@ abstract class DownloadRoomDao {
@Inject
lateinit var newBookDao: NewBookDao
@Query("SELECT * FROM DownloadRoomEntity")
abstract fun downloadRoomEntity(): Flow<List<DownloadRoomEntity>>
@Query("SELECT * FROM DownloadRoomEntity")
abstract fun getAllDownloads(): Flow<List<DownloadRoomEntity>>
fun downloads(): Flow<List<DownloadModel>> =
downloadRoomEntity()
getAllDownloads()
.distinctUntilChanged()
.onEach(::moveCompletedToBooksOnDiskDao)
.map { it.map(::DownloadModel) }

View File

@ -36,7 +36,7 @@ import javax.inject.Singleton
@Singleton
class NewLanguagesDao @Inject constructor(private val box: Box<LanguageEntity>) {
fun languages() =
box.asFlowable()
box.asFlow()
.map { it.map(LanguageEntity::toLanguageModel) }
fun insert(languages: List<Language>) {

View File

@ -18,16 +18,18 @@
package org.kiwix.kiwixmobile.core.downloader.downloadManager
import android.annotation.SuppressLint
import android.content.Context
import com.tonyodev.fetch2.Download
import com.tonyodev.fetch2.Error
import com.tonyodev.fetch2.Fetch
import com.tonyodev.fetch2.FetchListener
import com.tonyodev.fetch2core.DownloadBlock
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.PublishSubject
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import org.kiwix.kiwixmobile.core.dao.DownloadRoomDao
import org.kiwix.kiwixmobile.core.downloader.DownloadMonitor
import javax.inject.Inject
@ -37,23 +39,28 @@ const val FIVE = 5
const val HUNDERED = 100
const val DEFAULT_INT_VALUE = -1
@SuppressLint("CheckResult")
@Suppress("InjectDispatcher")
class DownloadManagerMonitor @Inject constructor(
val fetch: Fetch,
val context: Context,
val downloadRoomDao: DownloadRoomDao,
private val fetchDownloadNotificationManager: FetchDownloadNotificationManager
) : DownloadMonitor {
private val updater = PublishSubject.create<() -> Unit>()
private var updaterDisposable: Disposable? = null
private val taskFlow = MutableSharedFlow<suspend () -> Unit>(extraBufferCapacity = Int.MAX_VALUE)
private var updaterJob: Job? = null
private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
@Suppress("TooGenericExceptionCaught")
private fun setupUpdater() {
updaterDisposable = updater.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(
{ it.invoke() },
Throwable::printStackTrace
)
updaterJob = coroutineScope.launch {
taskFlow.collect { task ->
try {
task.invoke()
} catch (e: Exception) {
e.printStackTrace()
}
}
}
}
private val fetchListener = object : FetchListener {
@ -123,7 +130,7 @@ class DownloadManagerMonitor @Inject constructor(
}
private fun update(download: Download) {
updater.onNext {
taskFlow.tryEmit {
downloadRoomDao.update(download)
if (download.isPaused()) {
fetchDownloadNotificationManager.showDownloadPauseNotification(fetch, download)
@ -132,7 +139,7 @@ class DownloadManagerMonitor @Inject constructor(
}
private fun delete(download: Download) {
updater.onNext { downloadRoomDao.delete(download) }
taskFlow.tryEmit { downloadRoomDao.delete(download) }
}
override fun startMonitoringDownload() {
@ -142,6 +149,6 @@ class DownloadManagerMonitor @Inject constructor(
override fun stopListeningDownloads() {
fetch.removeListener(fetchListener)
updaterDisposable?.dispose()
updaterJob?.cancel()
}
}

View File

@ -40,7 +40,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import org.kiwix.kiwixmobile.core.CoreApp
@ -55,7 +55,7 @@ const val THIRTY_TREE = 33
@Suppress("InjectDispatcher")
class DownloadMonitorService : Service() {
private val updaterChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
private val taskFlow = MutableSharedFlow<suspend () -> Unit>(extraBufferCapacity = Int.MAX_VALUE)
private var updaterJob: Job? = null
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
private val notificationManager: NotificationManager by lazy {
@ -87,10 +87,10 @@ class DownloadMonitorService : Service() {
@Suppress("TooGenericExceptionCaught")
private fun setupUpdater() {
updaterJob = scope.launch {
for (task in updaterChannel) {
taskFlow.collect { task ->
try {
task()
} catch (e: Throwable) {
task.invoke()
} catch (e: Exception) {
e.printStackTrace()
}
}
@ -120,7 +120,7 @@ class DownloadMonitorService : Service() {
* should be canceled if the user cancels the download.
*/
private fun setForegroundNotification(downloadId: Int? = null) {
scope.launch {
taskFlow.tryEmit {
// Cancel the ongoing download notification if the user cancels the download.
downloadId?.let(::cancelNotificationForId)
fetch.getDownloads { downloadList ->
@ -228,14 +228,14 @@ class DownloadMonitorService : Service() {
download: Download,
shouldSetForegroundNotification: Boolean = false
) {
scope.launch {
taskFlow.tryEmit {
downloadRoomDao.update(download)
if (download.status == Status.COMPLETED) {
downloadRoomDao.getEntityForDownloadId(download.id.toLong())?.let {
showDownloadCompletedNotification(download)
// to move these downloads in NewBookDao.
@Suppress("IgnoredReturnValue")
downloadRoomDao.allDownloads().first()
downloadRoomDao.downloads().first()
}
}
// If someone pause the Download then post a notification since fetch removes the
@ -252,7 +252,7 @@ class DownloadMonitorService : Service() {
}
private fun delete(download: Download) {
scope.launch {
taskFlow.tryEmit {
downloadRoomDao.delete(download)
setForegroundNotification(download.id)
}
@ -263,7 +263,7 @@ class DownloadMonitorService : Service() {
fetch: Fetch,
download: Download
) {
scope.launch {
taskFlow.tryEmit {
// Check if there are any ongoing downloads.
// If the list is empty, it means no other downloads are running,
// so we need to promote this download to a foreground service.
@ -369,7 +369,6 @@ class DownloadMonitorService : Service() {
* Stops the foreground service, disposes of resources, and removes the Fetch listener.
*/
private fun stopForegroundServiceForDownloads() {
updaterChannel.close()
updaterJob?.cancel()
fetch.removeListener(fetchListener)
stopForeground(STOP_FOREGROUND_DETACH)

View File

@ -0,0 +1,37 @@
/*
* Kiwix Android
* Copyright (c) 2025 Kiwix <android.kiwix.org>
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.kiwix.kiwixmobile.core.extensions
import androidx.activity.compose.LocalActivity
import androidx.compose.runtime.Composable
import androidx.compose.runtime.LaunchedEffect
import kotlinx.coroutines.flow.Flow
import org.kiwix.kiwixmobile.core.main.CoreMainActivity
@Composable
fun <T> Flow<T>.CollectSideEffectWithActivity(
invokeWithActivity: (T, CoreMainActivity) -> Unit
) {
val activity = LocalActivity.current as? CoreMainActivity
LaunchedEffect(Unit) {
collect { effect ->
activity?.let { invokeWithActivity(effect, it) }
}
}
}