Background
I’m trying to make the repository and use cases return a Flow.
Since I’m using a paging source, I need to consume my Flow before being able to use it within the suspend fun load(params: LoadParams<Key>): LoadResult<Key, Value>
async method.
The RxJava approach is: Observable.toSingle,
which I expect to be Flow.first()
.
Issue
However, it returns a NoSuchElementException
.
Code snippets
Repo
@Singleton
class DefaultPostRepository @Inject constructor(
private val postService: PostService,
private val postMapper: PostMapper,
@IoDispatcher val ioDispatcher: CoroutineDispatcher
) : PostsRepository {
@RequiresExtension(extension = Build.VERSION_CODES.S, version = 7)
override suspend fun getPosts(page: Int) = flow<NetworkResult<List<PostModel>>> {
Log.e("myapp", "DefaultPostRepository ${Thread.currentThread().name}")
when (val postsResponse = postService.getPosts(page)) {
is NetworkError -> {
NetworkError(postsResponse.code, postsResponse.message)
}
is NetworkException -> {
NetworkException(postsResponse.e)
}
is NetworkSuccess -> {
NetworkSuccess(postMapper.fromListDto(postsResponse.data.data))
}
}
}.flowOn(ioDispatcher)
Use case
class GetPostUseCase @Inject constructor(private val postRepository: PostsRepository) {
suspend fun invoke(nextPage: Int) = postRepository.getPosts(nextPage)
}
PagingSource
class PostPagingSource(private val getPostUseCase: GetPostUseCase) :
PagingSource<Int, PostModel>() {
// The refresh key is used for the initial load of the next PagingSource, after invalidation
override fun getRefreshKey(state: PagingState<Int, PostModel>): Int? {
// We need to get the previous key (or next key if previous is null) of the page
// that was closest to the most recently accessed index.
// Anchor position is the most recently accessed index
return state.anchorPosition?.let { anchorPosition ->
state.closestPageToPosition(anchorPosition)?.prevKey?.plus(1)
?: state.closestPageToPosition(anchorPosition)?.nextKey?.minus(1)
}
}
override suspend fun load(params: LoadParams<Int>): LoadResult<Int, PostModel> {
val nextPage: Int = params.key ?: STARTING_PAGE_INDEX
// Log.e("myapp", "PostPagingSource ${Thread.currentThread().name}")
//runBlocking { result = getPostUseCase.invoke(nextPage).first() }
return when (val result = getPostUseCase.invoke(nextPage).first()) {
is NetworkSuccess ->
LoadResult.Page(
data = result.data,
prevKey = if (nextPage == STARTING_PAGE_INDEX) null else nextPage - 1,
nextKey = if (result.data.isEmpty()) null else nextPage + 1,
)
is NetworkError -> LoadResult.Error(Throwable(result.message))
is NetworkException -> {
LoadResult.Error(result.e)
}
}
}
}
Any help would be appreciated.