admin管理员组

文章数量:1310498

Background

I have an akka Actor called Client that manages http and https connections to a server. The client has many features, including a ping service and a tokenFetcher service.

The client represents a 'connection' between one server and another. It is designed to allow a server to chat to another.

The process of the client is as such:

  1. Periodically ping the other server to see if it is online
  2. If the other server is online, do auth and get a token
  3. If the token is valid, clear out any calls that have been requested of us

It is step 3 I am struggling with. I would like to know how I would implement this safely between threads (actors).

What I Have Tried:

I'm using a Seq of messages that the client would store, like such:


case class SendApiCall(apiCall: ApiCall, route: String, var sent: Boolean = false)

class Client(server: Server) extends Actor {
    private var apiCalls: Seq[SendApiCall] = Seq.empty[SendApiCall]

    ...

    override def receive: Receive = {
        case sendApiCall@SendApiCall(_, _, _) =>
            if (server.onlineStatus == OFFLINE) {
                apiCalls = apiCalls.appended(sendApiCall)
            }
            else {
                sendApiCall(sendApiCall)
            }
        
        case ServerOnline() => // <- this is send to us from the ping service when it first detects the server is online
            
            apiCalls.iterator.foreach( apiCallRequest =>
                if (!apiCallRequest.sent) {
                    sendApiCall(apiCallRequest)
                    apiCallRequest.sent = true
                }
                apiCallRequest
            )

            apiCalls = apiCalls.filterNot(apiCallRequest => apiCallRequest.sent)
    }
}

However, I believe apiCalls is a mutable state in this scenario? I would like to know:

  1. Is this thread safe?
  2. How would I make it thread safe, if it isn't?

Background

I have an akka Actor called Client that manages http and https connections to a server. The client has many features, including a ping service and a tokenFetcher service.

The client represents a 'connection' between one server and another. It is designed to allow a server to chat to another.

The process of the client is as such:

  1. Periodically ping the other server to see if it is online
  2. If the other server is online, do auth and get a token
  3. If the token is valid, clear out any calls that have been requested of us

It is step 3 I am struggling with. I would like to know how I would implement this safely between threads (actors).

What I Have Tried:

I'm using a Seq of messages that the client would store, like such:


case class SendApiCall(apiCall: ApiCall, route: String, var sent: Boolean = false)

class Client(server: Server) extends Actor {
    private var apiCalls: Seq[SendApiCall] = Seq.empty[SendApiCall]

    ...

    override def receive: Receive = {
        case sendApiCall@SendApiCall(_, _, _) =>
            if (server.onlineStatus == OFFLINE) {
                apiCalls = apiCalls.appended(sendApiCall)
            }
            else {
                sendApiCall(sendApiCall)
            }
        
        case ServerOnline() => // <- this is send to us from the ping service when it first detects the server is online
            
            apiCalls.iterator.foreach( apiCallRequest =>
                if (!apiCallRequest.sent) {
                    sendApiCall(apiCallRequest)
                    apiCallRequest.sent = true
                }
                apiCallRequest
            )

            apiCalls = apiCalls.filterNot(apiCallRequest => apiCallRequest.sent)
    }
}

However, I believe apiCalls is a mutable state in this scenario? I would like to know:

  1. Is this thread safe?
  2. How would I make it thread safe, if it isn't?
Share Improve this question edited Feb 3 at 13:17 Kris Rice asked Feb 3 at 12:04 Kris RiceKris Rice 1
Add a comment  | 

2 Answers 2

Reset to default 4

Given that your code is running inside an actor, and you seem to be following "the rules", it looks thread-safe to me.

Actors work in terms of a "mailbox", which can receive messages from many sources, possibly concurrently, but will only dispatch the messages to your actor's receive message sequentially. So while the receive function may not always run on the same thread, you can effectively consider your actor as single-threaded. As long as you don't do anything to make code inside your receive function run concurrently with itself, the sequential-ness of the actor/mailbox model makes it safe.

To illustrate something non-thread-safe that would compromise your thread safety, consider a function that you could call to trigger some asynchronous action with a callback; if your callback was able to directly interact with your actor's internal state, that would be non-thread-safe:

def exampleAsyncFunction: Future[Int] = ???

class MyActor extends Actor {

  private var myInternalState: Int = 0

  override def receive = {
    case SomeMessage =>
      // BAD!
      exampleAsyncFunction.onComplete { i =>
        // here, you've captured a reference to this actor, and
        // are directly manipulating its internal state, possibly
        // from outside the sequential context of the `receive` function
        myInternalState += i
      }

    case SomeOtherMessage =>
      // OK
      myInternalState += 1
  }
}

The above example would not be thread-safe, because the async callback function triggered by SomeMessage could potentially execute concurrently with the receive logic for SomeOtherMessage, causing conflicting modifications to myInternalState.

A potential way to make the above example safe is to have the async callback interact with the actor's mailbox instead of directly with its internal state:

def receive = {

  case SomeMessage => 
    val me = self // capture `self` before going into async context

    exampleAsyncFunction.onComplete { i =>
      // BETTER - sending messages to an actor is safe
      me ! IncrementCounter(i)
    }

  case IncrementCounter(i) =>
    // safe to modify internal state because it is a direct response
    // to a received message, as part of the receive loop
    myInternalState += i

  ...
}

In the example you gave in your post, it doesn't look like you are doing anything to cause concurrent modifications to your internal state, so it is thread-safe.

Akka actors are inherently thread-safe with regard to their internal state: the receive method cannot be invoked on the same actor instance concurrently. As such, your approach works from a thread-safety perspective.

本文标签: multithreadingHow to correctly (and safely) manage a Seq of objects in an akka actorStack Overflow