admin管理员组

文章数量:1124374

I have a loop in a Django application where I process a list of records and send periodic progress updates to the frontend. I'm using async_to_sync from Django Channels to send messages through the channel_layer (RedisChannelLayer). Here's the relevant code snippet:

# consumers.py
import json

from channels.generic.websocket import AsyncWebsocketConsumer

class TaskProgressConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.task_id = self.scope['url_route']['kwargs']['task_id']
        self.group_name = f'task_{self.task_id}'

        await self.channel_layer.group_add(
            self.group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # Leave the group when the WebSocket disconnects
        await self.channel_layer.group_discard(
            self.group_name,
            self.channel_name
        )

    async def receive(self, text_data):
        pass

    # This method will send progress updates to the WebSocket group
    async def send_task_progress(self, event):
        await self.send(text_data=json.dumps(event['message']))
# tasks.py
@shared_task(bind=True)
def run_task(self):
    ...
    channel_layer = get_channel_layer()
    task_id = str(self.request.id)

    for i, row in enumerate(records, start=1):
        ...
        if idx % 10 == 0:
            async_to_sync(channel_layer.group_send)(
                f'task_{task_id}',
                {"type": "send_task_progress", "message": {"progress": idx + 1, "total": len(records)}}
            )

The code generally works, but I'm noticing a stuttering issue:

The loop processes the first few batches of records (e.g., the first 10-20 updates) without issues. Then, it blocks for a couple of seconds before resuming progress. This pattern of smooth processing followed by intermittent delays continues throughout the loop.

Additionally, I’ve observed the following:

  • The Celery worker container spikes to over 150% CPU usage when processing tasks with over 1,000 iterations.
  • Redis, which I’m using as the channel layer backend, is running locally on my development machine.

More context:

  • Using Django Channels with Redis for WebSocket communication.

  • len(records) can be large, sometimes in the thousands.

  • The frontend is subscribed to updates via a WebSocket and does receive messages successfully.

  • Reducing the frequency of updates (e.g., updating progress every 100 records instead of every 10) mitigates the stuttering slightly but doesn’t eliminate it.

  • I've monitored Redis and my system’s memory usage during the process, and they seem stable.

  • I suspect that async_to_sync or channel_layer.group_send is contributing to the delay, perhaps due to its blocking behavior or the CPU-intensive workload.

本文标签: