admin管理员组文章数量:1298493
So, I want to update a broadcast variable, here a token, based on its lifespan.
api_token = {"token": "initial_token"}
broadcast_token = None
keep_running = True
async def renew_api_token():
"""Simulates API token renewal asynchronously."""
global api_token, broadcast_token
print("Renewing API token on the driver...")
api_token["token"] = f"new_token_{int(time.time())}"
if broadcast_token:
broadcast_token.unpersist() # Remove the old broadcast variable
broadcast_token = spark.sparkContext.broadcast(api_token["token"])
print(f"New token broadcasted: {api_token['token']}")
async def token_renewal_loop():
"""Background loop to renew the API token every 5 seconds."""
global keep_running
while keep_running:
await renew_api_token()
await asyncio.sleep(5) # Simulating token lifespan
def with_api_token(func):
"""Decorator to inject the latest broadcast token into the function."""
def wrapper(*args, **kwargs):
# Access the value of the broadcast variable
token_value = broadcast_token.value if broadcast_token else "no_token"
kwargs["api_token"] = token_value
return func(*args, **kwargs)
return wrapper
@with_api_token
def update_batch(data, api_token=None):
"""Simulates a batch update with the API token."""
print(f"Processing data: {data} with API token: {api_token}")
time.sleep(20) # Simulate processing time
return f"Processed {data} with token {api_token}"
With the batch processing time being 20 seconds and token lifespan being 5 seconds I expect to update the token 4 times during the batch processing. Unfortunatly, I am just learning the "asyncio" and I am facing complications on completing the task.
The idea is to run the token_renewal_loop
in the background and run update_batch
on the executor nodes. Once we are getting a token renewal point i.e. 5 seconds elapse, it should pause update_batch
and generate a new token via token_renewal_loop
.
The above code is all that I was able to come up with.
So, I want to update a broadcast variable, here a token, based on its lifespan.
api_token = {"token": "initial_token"}
broadcast_token = None
keep_running = True
async def renew_api_token():
"""Simulates API token renewal asynchronously."""
global api_token, broadcast_token
print("Renewing API token on the driver...")
api_token["token"] = f"new_token_{int(time.time())}"
if broadcast_token:
broadcast_token.unpersist() # Remove the old broadcast variable
broadcast_token = spark.sparkContext.broadcast(api_token["token"])
print(f"New token broadcasted: {api_token['token']}")
async def token_renewal_loop():
"""Background loop to renew the API token every 5 seconds."""
global keep_running
while keep_running:
await renew_api_token()
await asyncio.sleep(5) # Simulating token lifespan
def with_api_token(func):
"""Decorator to inject the latest broadcast token into the function."""
def wrapper(*args, **kwargs):
# Access the value of the broadcast variable
token_value = broadcast_token.value if broadcast_token else "no_token"
kwargs["api_token"] = token_value
return func(*args, **kwargs)
return wrapper
@with_api_token
def update_batch(data, api_token=None):
"""Simulates a batch update with the API token."""
print(f"Processing data: {data} with API token: {api_token}")
time.sleep(20) # Simulate processing time
return f"Processed {data} with token {api_token}"
With the batch processing time being 20 seconds and token lifespan being 5 seconds I expect to update the token 4 times during the batch processing. Unfortunatly, I am just learning the "asyncio" and I am facing complications on completing the task.
The idea is to run the token_renewal_loop
in the background and run update_batch
on the executor nodes. Once we are getting a token renewal point i.e. 5 seconds elapse, it should pause update_batch
and generate a new token via token_renewal_loop
.
The above code is all that I was able to come up with.
- These types of things are never 100% foolproof I would state. – Ged Commented Jan 26 at 13:04
1 Answer
Reset to default 0As you've described the 5 second token but 20 second batch this is not possible to push updates out from the driver.
Broadcast variables can be "updated" from the driver between actions, attempting to do it in the background will undoubtedly lead to cases where the variable is either unpersisted (forcing resend of the old value) or it uses the old token.
Given your token is shorter lived than the entire batch you'll have to have a unique token for each executor / partition, you could start with a shared then capture expiry within the batch (assuming the token is used in a map function or udf, as the broadcast token is only known to user code).
本文标签: multithreadingSpark asynchronously update the broadcast variableStack Overflow
版权声明:本文标题:multithreading - Spark: asynchronously update the broadcast variable - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1738424325a2086034.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论