admin管理员组文章数量:1302292
I'm kind of new to faust, and I need my app to handle kakfa messages using different consumer groups. I need to have two different consumer groups in the same program, but since the consumer group for each kafka topic is the same as the faust app id, I need two apps running on the same program. My problem is that I don't know how to initialize the two apps in the command line. This doesn't work, for example: faust -A test_faust:app1,app2 worker This is an example of what I'm trying to do:
## Script test_faust.py
import faust
app1 = faust.App('consumer_group1', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest')
app2 = faust.App('consumer_group2', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest')
topic1 = app1.topic('topic1', value_type=str)
topic2 = app2.topic('topic2', value_type=str)
@app1.agent(topic1)
async def process1(stream):
async for value in stream:
print(f'App1: {value}')
@app2.agent(topic2)
async def process2(stream):
async for value in stream:
print(f'App2: {value}')
Any ideas how I can initialize both apps with the same command?Or alternatively, using the same app, how can i create a different consumer groups for each topic? Is there an option to do something like
topic1 = app.topic('topic1', value_type=str, consumer_id='consumer_group1')
topic2 = app.topic('topic2', value_type=str, consumer_id='consumer_group2')
Or any way to start the two apps in the script via the command line? I'm trying
faust -A test_faust:app1,app2 worker
but gives error
ValueError: Component 'app1,app2' of 'test_faust:app1,app2' is not a valid identifier
Thanks
I'm kind of new to faust, and I need my app to handle kakfa messages using different consumer groups. I need to have two different consumer groups in the same program, but since the consumer group for each kafka topic is the same as the faust app id, I need two apps running on the same program. My problem is that I don't know how to initialize the two apps in the command line. This doesn't work, for example: faust -A test_faust:app1,app2 worker This is an example of what I'm trying to do:
## Script test_faust.py
import faust
app1 = faust.App('consumer_group1', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest')
app2 = faust.App('consumer_group2', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest')
topic1 = app1.topic('topic1', value_type=str)
topic2 = app2.topic('topic2', value_type=str)
@app1.agent(topic1)
async def process1(stream):
async for value in stream:
print(f'App1: {value}')
@app2.agent(topic2)
async def process2(stream):
async for value in stream:
print(f'App2: {value}')
Any ideas how I can initialize both apps with the same command?Or alternatively, using the same app, how can i create a different consumer groups for each topic? Is there an option to do something like
topic1 = app.topic('topic1', value_type=str, consumer_id='consumer_group1')
topic2 = app.topic('topic2', value_type=str, consumer_id='consumer_group2')
Or any way to start the two apps in the script via the command line? I'm trying
faust -A test_faust:app1,app2 worker
but gives error
ValueError: Component 'app1,app2' of 'test_faust:app1,app2' is not a valid identifier
Thanks
Share edited Feb 21 at 15:12 VMGA asked Feb 11 at 10:10 VMGAVMGA 13 bronze badges1 Answer
Reset to default 0I found a solution. The key is to pass the two apps to the worker:
import asyncio
import nest_asyncio
nest_asyncio.apply()
async def start_worker(worker: faust.Worker) -> None:
await worker.start()
def manage_loop():
loop = asyncio.get_event_loop_policy().get_event_loop()
try:
worker = faust.Worker(*[app1, app2], loop=loop)
loop.run_until_complete(start_worker(worker))
loop.run_forever()
finally:
worker.stop_and_shutdown()
worker.close()
if __name__=='__main__':
manage_loop()
And then to start the app you need to call
python test_faust.py worker
本文标签: How to create different consumer groups in the same faust app (using kafka and python)Stack Overflow
版权声明:本文标题:How to create different consumer groups in the same faust app (using kafka and python) - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741666292a2391328.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论