admin管理员组

文章数量:1298493

So, I want to update a broadcast variable, here a token, based on its lifespan.

api_token = {"token": "initial_token"}
broadcast_token = None
keep_running = True

async def renew_api_token():
    """Simulates API token renewal asynchronously."""
    global api_token, broadcast_token
    print("Renewing API token on the driver...")
    api_token["token"] = f"new_token_{int(time.time())}"
    if broadcast_token:
        broadcast_token.unpersist()  # Remove the old broadcast variable
    broadcast_token = spark.sparkContext.broadcast(api_token["token"])
    print(f"New token broadcasted: {api_token['token']}")

async def token_renewal_loop():
    """Background loop to renew the API token every 5 seconds."""
    global keep_running
    while keep_running:
        await renew_api_token()
        await asyncio.sleep(5)  # Simulating token lifespan

def with_api_token(func):
    """Decorator to inject the latest broadcast token into the function."""
    def wrapper(*args, **kwargs):
        # Access the value of the broadcast variable
        token_value = broadcast_token.value if broadcast_token else "no_token"
        kwargs["api_token"] = token_value
        return func(*args, **kwargs)
    return wrapper
  
@with_api_token
def update_batch(data, api_token=None):
    """Simulates a batch update with the API token."""
    print(f"Processing data: {data} with API token: {api_token}")
    time.sleep(20)  # Simulate processing time
    return f"Processed {data} with token {api_token}"

With the batch processing time being 20 seconds and token lifespan being 5 seconds I expect to update the token 4 times during the batch processing. Unfortunatly, I am just learning the "asyncio" and I am facing complications on completing the task.

The idea is to run the token_renewal_loop in the background and run update_batch on the executor nodes. Once we are getting a token renewal point i.e. 5 seconds elapse, it should pause update_batch and generate a new token via token_renewal_loop.
The above code is all that I was able to come up with.

So, I want to update a broadcast variable, here a token, based on its lifespan.

api_token = {"token": "initial_token"}
broadcast_token = None
keep_running = True

async def renew_api_token():
    """Simulates API token renewal asynchronously."""
    global api_token, broadcast_token
    print("Renewing API token on the driver...")
    api_token["token"] = f"new_token_{int(time.time())}"
    if broadcast_token:
        broadcast_token.unpersist()  # Remove the old broadcast variable
    broadcast_token = spark.sparkContext.broadcast(api_token["token"])
    print(f"New token broadcasted: {api_token['token']}")

async def token_renewal_loop():
    """Background loop to renew the API token every 5 seconds."""
    global keep_running
    while keep_running:
        await renew_api_token()
        await asyncio.sleep(5)  # Simulating token lifespan

def with_api_token(func):
    """Decorator to inject the latest broadcast token into the function."""
    def wrapper(*args, **kwargs):
        # Access the value of the broadcast variable
        token_value = broadcast_token.value if broadcast_token else "no_token"
        kwargs["api_token"] = token_value
        return func(*args, **kwargs)
    return wrapper
  
@with_api_token
def update_batch(data, api_token=None):
    """Simulates a batch update with the API token."""
    print(f"Processing data: {data} with API token: {api_token}")
    time.sleep(20)  # Simulate processing time
    return f"Processed {data} with token {api_token}"

With the batch processing time being 20 seconds and token lifespan being 5 seconds I expect to update the token 4 times during the batch processing. Unfortunatly, I am just learning the "asyncio" and I am facing complications on completing the task.

The idea is to run the token_renewal_loop in the background and run update_batch on the executor nodes. Once we are getting a token renewal point i.e. 5 seconds elapse, it should pause update_batch and generate a new token via token_renewal_loop.
The above code is all that I was able to come up with.

Share Improve this question edited Jan 24 at 12:40 user13 asked Jan 24 at 11:31 user13user13 3911 gold badge4 silver badges20 bronze badges 1
  • These types of things are never 100% foolproof I would state. – Ged Commented Jan 26 at 13:04
Add a comment  | 

1 Answer 1

Reset to default 0

As you've described the 5 second token but 20 second batch this is not possible to push updates out from the driver.

Broadcast variables can be "updated" from the driver between actions, attempting to do it in the background will undoubtedly lead to cases where the variable is either unpersisted (forcing resend of the old value) or it uses the old token.

Given your token is shorter lived than the entire batch you'll have to have a unique token for each executor / partition, you could start with a shared then capture expiry within the batch (assuming the token is used in a map function or udf, as the broadcast token is only known to user code).

本文标签: multithreadingSpark asynchronously update the broadcast variableStack Overflow