admin管理员组文章数量:1125959
I have an Android app that uses Ktor to make HTTP requests. Outside a Worker, the request works as expected, but when I call the request inside a worker, they always timeout: io.ktor.clientwork.sockets.ConnectTimeoutException: Connect timeout has expired [url=, connect_timeout=unknown ms]
. Can't figure out why it is happening.
All functions call getClient
. Outside the Worker, I call createBucket
, and it works as expected. Inside the Worker I call uploadImages
and downloadImages
, which timeout (in getClient
function request).
Class with the HTTP requests:
private class B2Client(
val authorizationToken: String,
val apiUrl: String,
val accountId: String,
val client: HttpClient
)
class BackblazeB2(val remote: Remote) :
CloudProvider {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
override suspend fun getRemoteImagesIds(): List<UUID> {
val url: String = "/b2api/v3/b2_list_file_names"
val response = scope.async(Dispatchers.IO) {
val b2Client = getClient(scope, remote.apiKeyId, remote.apiKey).await()
return@async b2Client.client.post(b2Client.apiUrl + url) {
headers {
append(HttpHeaders.Authorization, b2Client.authorizationToken)
}
contentType(ContentType.Application.Json)
setBody(
buildJsonObject {
put("bucketId", remote.bucketId)
}
)
}
}.await()
val body: JsonElement = response.body()
val files = body.jsonObject.getValue("files").jsonArray
val imagesIds = mutableListOf<UUID>()
files.forEach { file ->
val fileName: String = file.jsonObject.getValue("fileName").jsonPrimitive.content
imagesIds.add(UUID.fromString(fileName))
}
return imagesIds
}
override suspend fun uploadImages(images: List<Image>, context: Context): List<Deferred<UUID>> {
val uploadUrlUrl = "/b2api/v3/b2_get_upload_url"
val jobs: MutableList<Deferred<UUID>> = mutableListOf()
images.forEach { image ->
jobs.add(scope.async(Dispatchers.IO) {
val b2Client = getClient(scope, remote.apiKeyId, remote.apiKey).await()
val response = b2Client.client.get(b2Client.apiUrl + uploadUrlUrl) {
headers {
append(HttpHeaders.Authorization, b2Client.authorizationToken)
}
url {
parameters.append("bucketId", remote.bucketId)
}
}
val body: JsonElement = response.body()
val uploadUrl: String = body.jsonObject.getValue("uploadUrl").jsonPrimitive.content
val authorizationToken: String =
body.jsonObject.getValue("authorizationToken").jsonPrimitive.content
val imageBytes = getImageBytes(image.localPath!!, context)
val imageSHA1 = calculateSHA1(imageBytes)
val uploadResponse = b2Client.client.post(uploadUrl) {
headers {
append(HttpHeaders.Authorization, authorizationToken)
append(HttpHeaders.ContentType, image.contentType)
append("X-Bz-File-Name", image.uuid.toString())
append(HttpHeaders.ContentLength, imageBytes.size.toString())
append("X-Bz-Content-Sha1", sha1ToHex(imageSHA1!!))
}
setBody(imageBytes)
}
if (uploadResponse.status != HttpStatusCode.OK) {
throw Error("Image upload failed")
}
return@async image.uuid
})
}
return jobs
}
override suspend fun downloadImages(
imagesIds: List<UUID>,
context: Context
): List<Deferred<Image>> {
val downloadUrl = "/file"
val images: MutableList<Deferred<Image>> = mutableListOf()
imagesIds.forEach { imageId ->
images.add(scope.async(Dispatchers.IO) {
val b2Client = getClient(scope, remote.apiKeyId, remote.apiKey).await()
val downloadResponse = b2Client.client.get(b2Client.apiUrl + downloadUrl) {
url {
appendPathSegments(remote.name, imageId.toString())
}
headers {
append(HttpHeaders.Authorization, b2Client.authorizationToken)
}
}
val imageBytes: ByteArray = downloadResponse.body()
val contentType: String = downloadResponse.headers[HttpHeaders.ContentType]!!
val uri = saveImageToMediaStore(
context,
imageBytes,
imageId.toString(),
contentType
)
return@async Image(imageId, SyncStatus.BOTH, uri, imageId.toString(), contentType)
})
}
return images
}
companion object {
private const val BASE_URL = ";
suspend fun createBucket(apiKeyId: String, apiKey: String, name: String): Remote {
val scope = CoroutineScope(Dispatchers.IO + Job())
val url: String = "/b2api/v3/b2_create_bucket"
try {
val bucketId: String = scope.async(Dispatchers.IO) {
val b2Client = getClient(scope, apiKeyId, apiKey).await()
try {
val response = b2Client.client.post(b2Client.apiUrl + url) {
headers {
append(HttpHeaders.Authorization, b2Client.authorizationToken)
}
contentType(ContentType.Application.Json)
setBody(
buildJsonObject {
put("accountId", b2Client.accountId)
put("bucketName", name)
put("bucketType", "allPrivate")
putJsonObject("defaultServerSideEncryption") {
put("mode", "SSE-B2")
put("algorithm", "AES256")
}
}
)
}
val body: JsonElement = response.body()
return@async body.jsonObject.getValue("bucketId").jsonPrimitive.content
} catch (e: ClientRequestException) {
val url = "/b2api/v3/b2_list_buckets"
val response = b2Client.client.post(b2Client.apiUrl + url) {
headers {
append(HttpHeaders.Authorization, b2Client.authorizationToken)
}
contentType(ContentType.Application.Json)
setBody(
buildJsonObject {
put("accountId", b2Client.accountId)
}
)
}
val body: JsonElement = response.body()
val bucketJson =
body.jsonObject.getValue("buckets").jsonArray.first { elem ->
elem.jsonObject.getValue("bucketName").jsonPrimitive.content == name
}
return@async bucketJson.jsonObject.getValue("bucketId").jsonPrimitive.content
}
}.await()
return Remote(name, CloudProviders.BACKBLAZE, apiKeyId, apiKey, bucketId, false)
} finally {
scope.cancel()
}
}
private fun getClient(
scope: CoroutineScope,
apiKeyId: String,
apiKey: String
): Deferred<B2Client> {
val url = "/b2api/v3/b2_authorize_account"
val client = HttpClient(OkHttp) {
expectSuccess = true
install(ContentNegotiation) {
json(Json {
prettyPrint = true
isLenient = true
ignoreUnknownKeys = true
})
}
install(HttpTimeout) {
requestTimeoutMillis = 60000
}
install(HttpRequestRetry) {
maxRetries = 3
retryOnExceptionIf { request, cause -> cause is ConnectTimeoutException || cause is SocketTimeoutException }
}
}
return scope.async(Dispatchers.IO) {
try {
val response1: HttpResponse = client.get("/")
Log.d("BackBlaze B2", response1.status.value.toString())
val response: HttpResponse = client.get(BASE_URL + url) {
headers {
append(
HttpHeaders.Authorization,
"Basic" + encodeToBase64("$apiKeyId:$apiKey")
)
}
}
val jsonElement: JsonElement = response.body()
val authorizationToken =
jsonElement.jsonObject.getValue("authorizationToken").jsonPrimitive.content
val accountId =
jsonElement.jsonObject.getValue("accountId").jsonPrimitive.content
val apiUrl =
jsonElement
.jsonObject.getValue("apiInfo")
.jsonObject.getValue("storageApi")
.jsonObject.getValue("apiUrl")
.jsonPrimitive.content
return@async B2Client(authorizationToken, apiUrl, accountId, client)
} catch (e: Exception) {
e.printStackTrace()
throw e
}
}
}
}
override fun cancel() {
scope.cancel()
}
}
Worker:
class SyncWorker(val appContext: Context, workerParams: WorkerParameters) :
CoroutineWorker(appContext, workerParams) {
private val notificationManager =
applicationContext.getSystemService(NOTIFICATION_SERVICE) as NotificationManager
private val notificationChannelId = "image_sync_channel"
private val notificationId = 1
init {
createNotificationChannel()
}
override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
try {
coroutineScope {
performSynchronization()
}
Result.success()
} catch (e: Exception) {
Result.failure()
}
}
private suspend fun performSynchronization() {
val remotesRepository = RemotesRepository(appContext)
val imagesRepository = MediaRepository(appContext)
val remote: Remote? = remotesRepository.getPrincipalRemote()
if (remote == null) return
val cloudProvider: CloudProvider = remote.getStrategy()
try {
val remoteImagesIds = cloudProvider.getRemoteImagesIds()
val localImages = imagesRepository.getImagesByStatus(SyncStatus.LOCAL)
val remoteAndLocalImagesIds =
imagesRepository.getImagesByStatus(SyncStatus.BOTH).map { image -> image.uuid }
val imagesToDownload = remoteImagesIds.minus(remoteAndLocalImagesIds)
val totalSyncFiles: Int = imagesToDownload.size + localImages.size
val uploadedImagesIds = cloudProvider.uploadImages(localImages, appContext)
val downloadedImages = cloudProvider.downloadImages(imagesToDownload, appContext)
val uploadJobs: MutableList<Job> = mutableListOf()
val downloadJobs: MutableList<Job> = mutableListOf()
var syncedFiles: Int = 0
fun updateNotification() {
syncedFiles++
val messase = "A sincronizar ficheiro $syncedFiles de $totalSyncFiles"
val progress = (syncedFiles.toFloat() / totalSyncFiles) * 100
showNotification(progress.toInt(), messase)
}
supervisorScope {
uploadedImagesIds.forEachIndexed { i, imageFuture ->
uploadJobs.add(launch(Dispatchers.IO) {
val image = imageFuture.await()
imagesRepository.updateImageStatus(image, SyncStatus.BOTH)
updateNotification()
})
}
downloadedImages.forEachIndexed { i, imageFuture ->
downloadJobs.add(launch(Dispatchers.IO) {
val image = imageFuture.await()
imagesRepository.addImage(image)
updateNotification()
})
}
}
uploadJobs.joinAll()
downloadJobs.joinAll()
imagesRepository.insertSyncHystory(
SyncHistory(
date = Date(),
syncType = SyncType.REMOTE
)
)
} finally {
cloudProvider.cancel()
}
}
private fun showNotification(progress: Int, message: String) {
if (ActivityCompat.checkSelfPermission(
appContext,
Manifest.permission.POST_NOTIFICATIONS
) != PackageManager.PERMISSION_GRANTED
) {
Log.d("SyncWorker", "No permissions for post notification")
return
}
val notification = createNotification(progress, message)
notificationManager.notify(notificationId, notification.build())
}
private fun createNotification(progress: Int, message: String): NotificationCompat.Builder {
val intent = Intent(appContext, MainActivity::class.java).apply {
flags = Intent.FLAG_ACTIVITY_NEW_TASK or Intent.FLAG_ACTIVITY_CLEAR_TASK
}
val pendingIntent: PendingIntent =
PendingIntent.getActivity(appContext, 0, intent, PendingIntent.FLAG_IMMUTABLE)
return NotificationCompat.Builder(appContext, notificationChannelId)
.setSmallIcon(R.drawable.ic_launcher_foreground) // Replace with your icon
.setContentTitle("A sincronizar imagens...")
.setContentText(message)
.setProgress(100, progress, false)
.setOngoing(true)
.setContentIntent(pendingIntent)
}
private fun createNotificationChannel() {
val name = "Images synchronization progress"
val descriptionText = "Channel for images synchronization progress"
val importance = NotificationManager.IMPORTANCE_LOW
val channel = NotificationChannel(notificationChannelId, name, importance).apply {
description = descriptionText
}
notificationManager.createNotificationChannel(channel)
}
}
本文标签: androidKtor HTTP request timeout only in a WorkerStack Overflow
版权声明:本文标题:android - Ktor HTTP request timeout only in a Worker - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736678344a1947298.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论