admin管理员组

文章数量:1125959

I have an Android app that uses Ktor to make HTTP requests. Outside a Worker, the request works as expected, but when I call the request inside a worker, they always timeout: io.ktor.clientwork.sockets.ConnectTimeoutException: Connect timeout has expired [url=, connect_timeout=unknown ms]. Can't figure out why it is happening.

All functions call getClient. Outside the Worker, I call createBucket, and it works as expected. Inside the Worker I call uploadImages and downloadImages, which timeout (in getClient function request).

Class with the HTTP requests:

private class B2Client(
    val authorizationToken: String,
    val apiUrl: String,
    val accountId: String,
    val client: HttpClient
)


class BackblazeB2(val remote: Remote) :
    CloudProvider {
    private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())

    override suspend fun getRemoteImagesIds(): List<UUID> {
        val url: String = "/b2api/v3/b2_list_file_names"

        val response = scope.async(Dispatchers.IO) {
            val b2Client = getClient(scope, remote.apiKeyId, remote.apiKey).await()

            return@async b2Client.client.post(b2Client.apiUrl + url) {
                headers {
                    append(HttpHeaders.Authorization, b2Client.authorizationToken)
                }
                contentType(ContentType.Application.Json)
                setBody(
                    buildJsonObject {
                        put("bucketId", remote.bucketId)
                    }
                )
            }
        }.await()

        val body: JsonElement = response.body()

        val files = body.jsonObject.getValue("files").jsonArray

        val imagesIds = mutableListOf<UUID>()

        files.forEach { file ->
            val fileName: String = file.jsonObject.getValue("fileName").jsonPrimitive.content

            imagesIds.add(UUID.fromString(fileName))
        }

        return imagesIds
    }

    override suspend fun uploadImages(images: List<Image>, context: Context): List<Deferred<UUID>> {
        val uploadUrlUrl = "/b2api/v3/b2_get_upload_url"

        val jobs: MutableList<Deferred<UUID>> = mutableListOf()

        images.forEach { image ->
            jobs.add(scope.async(Dispatchers.IO) {
                val b2Client = getClient(scope, remote.apiKeyId, remote.apiKey).await()

                val response = b2Client.client.get(b2Client.apiUrl + uploadUrlUrl) {
                    headers {
                        append(HttpHeaders.Authorization, b2Client.authorizationToken)
                    }
                    url {
                        parameters.append("bucketId", remote.bucketId)
                    }
                }

                val body: JsonElement = response.body()

                val uploadUrl: String = body.jsonObject.getValue("uploadUrl").jsonPrimitive.content
                val authorizationToken: String =
                    body.jsonObject.getValue("authorizationToken").jsonPrimitive.content

                val imageBytes = getImageBytes(image.localPath!!, context)
                val imageSHA1 = calculateSHA1(imageBytes)

                val uploadResponse = b2Client.client.post(uploadUrl) {
                    headers {
                        append(HttpHeaders.Authorization, authorizationToken)
                        append(HttpHeaders.ContentType, image.contentType)
                        append("X-Bz-File-Name", image.uuid.toString())
                        append(HttpHeaders.ContentLength, imageBytes.size.toString())
                        append("X-Bz-Content-Sha1", sha1ToHex(imageSHA1!!))
                    }
                    setBody(imageBytes)
                }

                if (uploadResponse.status != HttpStatusCode.OK) {
                    throw Error("Image upload failed")
                }

                return@async image.uuid
            })
        }

        return jobs
    }

    override suspend fun downloadImages(
        imagesIds: List<UUID>,
        context: Context
    ): List<Deferred<Image>> {
        val downloadUrl = "/file"

        val images: MutableList<Deferred<Image>> = mutableListOf()

        imagesIds.forEach { imageId ->
            images.add(scope.async(Dispatchers.IO) {
                val b2Client = getClient(scope, remote.apiKeyId, remote.apiKey).await()

                val downloadResponse = b2Client.client.get(b2Client.apiUrl + downloadUrl) {
                    url {
                        appendPathSegments(remote.name, imageId.toString())
                    }
                    headers {
                        append(HttpHeaders.Authorization, b2Client.authorizationToken)
                    }
                }

                val imageBytes: ByteArray = downloadResponse.body()

                val contentType: String = downloadResponse.headers[HttpHeaders.ContentType]!!

                val uri = saveImageToMediaStore(
                    context,
                    imageBytes,
                    imageId.toString(),
                    contentType
                )

                return@async Image(imageId, SyncStatus.BOTH, uri, imageId.toString(), contentType)
            })
        }

        return images
    }

    companion object {
        private const val BASE_URL = ";

        suspend fun createBucket(apiKeyId: String, apiKey: String, name: String): Remote {
            val scope = CoroutineScope(Dispatchers.IO + Job())

            val url: String = "/b2api/v3/b2_create_bucket"

            try {
                val bucketId: String = scope.async(Dispatchers.IO) {
                    val b2Client = getClient(scope, apiKeyId, apiKey).await()

                    try {
                        val response = b2Client.client.post(b2Client.apiUrl + url) {
                            headers {
                                append(HttpHeaders.Authorization, b2Client.authorizationToken)
                            }
                            contentType(ContentType.Application.Json)
                            setBody(
                                buildJsonObject {
                                    put("accountId", b2Client.accountId)
                                    put("bucketName", name)
                                    put("bucketType", "allPrivate")
                                    putJsonObject("defaultServerSideEncryption") {
                                        put("mode", "SSE-B2")
                                        put("algorithm", "AES256")
                                    }
                                }
                            )
                        }

                        val body: JsonElement = response.body()

                        return@async body.jsonObject.getValue("bucketId").jsonPrimitive.content
                    } catch (e: ClientRequestException) {
                        val url = "/b2api/v3/b2_list_buckets"

                        val response = b2Client.client.post(b2Client.apiUrl + url) {
                            headers {
                                append(HttpHeaders.Authorization, b2Client.authorizationToken)
                            }
                            contentType(ContentType.Application.Json)
                            setBody(
                                buildJsonObject {
                                    put("accountId", b2Client.accountId)
                                }
                            )
                        }

                        val body: JsonElement = response.body()

                        val bucketJson =
                            body.jsonObject.getValue("buckets").jsonArray.first { elem ->
                                elem.jsonObject.getValue("bucketName").jsonPrimitive.content == name
                            }

                        return@async bucketJson.jsonObject.getValue("bucketId").jsonPrimitive.content
                    }
                }.await()

                return Remote(name, CloudProviders.BACKBLAZE, apiKeyId, apiKey, bucketId, false)
            } finally {
                scope.cancel()
            }
        }

        private fun getClient(
            scope: CoroutineScope,
            apiKeyId: String,
            apiKey: String
        ): Deferred<B2Client> {
            val url = "/b2api/v3/b2_authorize_account"

            val client = HttpClient(OkHttp) {
                expectSuccess = true
                install(ContentNegotiation) {
                    json(Json {
                        prettyPrint = true
                        isLenient = true
                        ignoreUnknownKeys = true
                    })
                }
                install(HttpTimeout) {
                    requestTimeoutMillis = 60000
                }
                install(HttpRequestRetry) {
                    maxRetries = 3
                    retryOnExceptionIf { request, cause -> cause is ConnectTimeoutException || cause is SocketTimeoutException }
                }
            }

            return scope.async(Dispatchers.IO) {
                try {
                    val response1: HttpResponse = client.get("/")

                    Log.d("BackBlaze B2", response1.status.value.toString())

                    val response: HttpResponse = client.get(BASE_URL + url) {
                        headers {
                            append(
                                HttpHeaders.Authorization,
                                "Basic" + encodeToBase64("$apiKeyId:$apiKey")
                            )
                        }
                    }

                    val jsonElement: JsonElement = response.body()

                    val authorizationToken =
                        jsonElement.jsonObject.getValue("authorizationToken").jsonPrimitive.content

                    val accountId =
                        jsonElement.jsonObject.getValue("accountId").jsonPrimitive.content

                    val apiUrl =
                        jsonElement
                            .jsonObject.getValue("apiInfo")
                            .jsonObject.getValue("storageApi")
                            .jsonObject.getValue("apiUrl")
                            .jsonPrimitive.content

                    return@async B2Client(authorizationToken, apiUrl, accountId, client)
                } catch (e: Exception) {
                    e.printStackTrace()

                    throw e
                }
            }
        }
    }

    override fun cancel() {
        scope.cancel()
    }
}

Worker:

class SyncWorker(val appContext: Context, workerParams: WorkerParameters) :
    CoroutineWorker(appContext, workerParams) {

    private val notificationManager =
        applicationContext.getSystemService(NOTIFICATION_SERVICE) as NotificationManager
    private val notificationChannelId = "image_sync_channel"
    private val notificationId = 1

    init {
        createNotificationChannel()
    }

    override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
        try {
            coroutineScope {
                performSynchronization()
            }

            Result.success()
        } catch (e: Exception) {
            Result.failure()
        }
    }

    private suspend fun performSynchronization() {
        val remotesRepository = RemotesRepository(appContext)
        val imagesRepository = MediaRepository(appContext)

        val remote: Remote? = remotesRepository.getPrincipalRemote()

        if (remote == null) return

        val cloudProvider: CloudProvider = remote.getStrategy()

        try {
            val remoteImagesIds = cloudProvider.getRemoteImagesIds()

            val localImages = imagesRepository.getImagesByStatus(SyncStatus.LOCAL)

            val remoteAndLocalImagesIds =
                imagesRepository.getImagesByStatus(SyncStatus.BOTH).map { image -> image.uuid }

            val imagesToDownload = remoteImagesIds.minus(remoteAndLocalImagesIds)

            val totalSyncFiles: Int = imagesToDownload.size + localImages.size

            val uploadedImagesIds = cloudProvider.uploadImages(localImages, appContext)

            val downloadedImages = cloudProvider.downloadImages(imagesToDownload, appContext)

            val uploadJobs: MutableList<Job> = mutableListOf()
            val downloadJobs: MutableList<Job> = mutableListOf()

            var syncedFiles: Int = 0

            fun updateNotification() {
                syncedFiles++

                val messase = "A sincronizar ficheiro $syncedFiles de $totalSyncFiles"
                val progress = (syncedFiles.toFloat() / totalSyncFiles) * 100

                showNotification(progress.toInt(), messase)
            }

            supervisorScope {
                uploadedImagesIds.forEachIndexed { i, imageFuture ->
                    uploadJobs.add(launch(Dispatchers.IO) {
                        val image = imageFuture.await()

                        imagesRepository.updateImageStatus(image, SyncStatus.BOTH)

                        updateNotification()
                    })
                }

                downloadedImages.forEachIndexed { i, imageFuture ->
                    downloadJobs.add(launch(Dispatchers.IO) {
                        val image = imageFuture.await()

                        imagesRepository.addImage(image)

                        updateNotification()
                    })
                }
            }

            uploadJobs.joinAll()
            downloadJobs.joinAll()

            imagesRepository.insertSyncHystory(
                SyncHistory(
                    date = Date(),
                    syncType = SyncType.REMOTE
                )
            )
        } finally {
            cloudProvider.cancel()
        }
    }

    private fun showNotification(progress: Int, message: String) {
        if (ActivityCompat.checkSelfPermission(
                appContext,
                Manifest.permission.POST_NOTIFICATIONS
            ) != PackageManager.PERMISSION_GRANTED
        ) {
            Log.d("SyncWorker", "No permissions for post notification")

            return
        }

        val notification = createNotification(progress, message)

        notificationManager.notify(notificationId, notification.build())
    }

    private fun createNotification(progress: Int, message: String): NotificationCompat.Builder {
        val intent = Intent(appContext, MainActivity::class.java).apply {
            flags = Intent.FLAG_ACTIVITY_NEW_TASK or Intent.FLAG_ACTIVITY_CLEAR_TASK
        }
        val pendingIntent: PendingIntent =
            PendingIntent.getActivity(appContext, 0, intent, PendingIntent.FLAG_IMMUTABLE)

        return NotificationCompat.Builder(appContext, notificationChannelId)
            .setSmallIcon(R.drawable.ic_launcher_foreground) // Replace with your icon
            .setContentTitle("A sincronizar imagens...")
            .setContentText(message)
            .setProgress(100, progress, false)
            .setOngoing(true)
            .setContentIntent(pendingIntent)
    }

    private fun createNotificationChannel() {
        val name = "Images synchronization progress"
        val descriptionText = "Channel for images synchronization progress"
        val importance = NotificationManager.IMPORTANCE_LOW
        val channel = NotificationChannel(notificationChannelId, name, importance).apply {
            description = descriptionText
        }

        notificationManager.createNotificationChannel(channel)
    }
}

本文标签: androidKtor HTTP request timeout only in a WorkerStack Overflow