Background Jobs =============== *Asynchronous Processing with Real-Time Updates* In this section, we'll integrate background job processing with WebSocket communication. This powerful pattern allows you to offload long-running tasks from WebSocket connections while providing real-time updates when jobs complete. Understanding Background Jobs with WebSockets --------------------------------------------- **The Challenge**: WebSocket connections should remain responsive and not block on long-running operations like AI processing, file uploads, or external API calls. **The Solution**: Use background workers to process jobs asynchronously while maintaining real-time communication with clients through channel layers. **Key Benefits:** - **Non-blocking**: WebSocket stays responsive while jobs run in background - **Scalable**: Multiple workers can process jobs concurrently - **Real-time Feedback**: Clients receive results immediately when jobs complete - **Cross-process Communication**: Workers and WebSocket servers can run on different machines **Architecture Overview:** 1. Client sends message to WebSocket consumer 2. Consumer queues a background job and responds immediately 3. Background worker processes the job asynchronously 4. Worker sends results back to client via channel layers 5. Client receives real-time updates when job completes Understanding sync_to_async and async_to_sync --------------------------------------------- When integrating async WebSocket consumers with sync background workers, we need utilities to bridge the synchronous and asynchronous worlds: **async_to_sync**: Converts async functions to sync (mainly used here) - Background workers are typically sync functions - Channel layer operations are async, so we wrap them with ``async_to_sync()`` - Example: ``async_to_sync(channel_layer.send)(channel_name, message)`` **sync_to_async**: Converts sync functions to async - Used when calling sync database operations from async consumers - Example: ``await sync_to_async(User.objects.get)(id=user_id)`` Both utilities come from ``asgiref.sync`` and are essential for mixing sync/async code in Django-style applications. Creating Background Tasks ------------------------- First, let's create our background task definitions. Create a new file at ``sandbox/tasks.py``: .. raw:: html
Background Tasks Code .. literalinclude:: ../../sandbox/tasks.py :language: python .. raw:: html
This tasks file provides: - **Multiple Job Types**: Translation, text analysis, AI response generation, and default processing - **Redis Queue Integration**: Uses RQ (Redis Queue) for job management - **Channel Layer Communication**: Jobs send results back to WebSocket clients using ``async_to_sync`` - **Job Dispatcher**: ``queue_job()`` function routes different job types to appropriate handlers - **Error Handling**: Graceful handling of channel layer communication errors **Key Components:** - ``JOB_FUNCTIONS``: Maps job types to processing functions - ``_send_result_to_client()``: Uses channel layers to deliver results back to WebSocket - ``setup_layers()``: Ensures channel layers are available in worker processes Creating the Background Worker ------------------------------ Next, create the worker script at ``sandbox/worker.py``: .. raw:: html
Background Worker Code .. literalinclude:: ../../sandbox/worker.py :language: python .. raw:: html
This worker script: - **Monitors Redis Queue**: Continuously polls for new jobs to process - **Channel Layer Setup**: Initializes Fast Channels layers for result communication - **Graceful Shutdown**: Handles Ctrl+C and system signals properly - **Connection Testing**: Verifies Redis connectivity before starting - **Error Handling**: Provides clear feedback for connection and processing issues Creating the Development Launcher --------------------------------- For easier development, create ``sandbox/start_dev.py`` to run both server and worker together: .. raw:: html
Development Launcher Code .. literalinclude:: ../../sandbox/start_dev.py :language: python .. raw:: html
This launcher script: - **Dual Process Management**: Starts both RQ worker and FastAPI server - **Coordinated Shutdown**: Stops both processes gracefully with Ctrl+C - **Process Monitoring**: Restarts if either process dies unexpectedly - **Development Optimized**: Includes live reload for the FastAPI application Creating the Background Jobs Consumer ------------------------------------- Now let's create the WebSocket consumer that integrates with our background jobs. First, create the module structure: .. code-block:: bash # Create the background_jobs module directory mkdir -p sandbox/apps/background_jobs # Create __init__.py file to make it a Python module touch sandbox/apps/background_jobs/__init__.py Create a new file at ``sandbox/apps/background_jobs/consumer.py``: .. raw:: html
Background Jobs Consumer Code .. literalinclude:: ../../sandbox/tutorial/apps/background_jobs/consumer.py :language: python .. raw:: html
Understanding the Background Jobs Consumer ------------------------------------------ Let's break down how this consumer works: **JSON-First Design:** - Extends ``AsyncJsonWebsocketConsumer`` for automatic JSON parsing - Expects structured messages: ``{"type": "translate", "content": "hello"}`` - All responses are JSON with consistent format: ``{"status": "queuing", "message": "...", "job_id": "..."}`` **Job Queuing:** - ``queue_job()`` sends work to background queue immediately - Returns job ID for tracking and client-side status updates - Consumer remains responsive during job processing with immediate feedback **Structured Response Flow:** 1. ``{"status": "queuing"}`` - Job is being processed 2. ``{"status": "queued", "job_id": "abc123"}`` - Job successfully queued 3. ``{"type": "job_result", "message": "..."}`` - Final results from worker **Result Handling:** - ``job_result()`` method receives results from background workers - Background workers use channel layers to send results back - Real-time delivery to the specific client that requested the job **Channel Layer Usage:** - Uses "chat" layer (same as room chat) for result communication - Background workers send messages to specific channel names - Enables cross-process communication between workers and WebSocket server Integrating the Consumer ------------------------ Connect the background jobs consumer to your FastAPI application. Open ``sandbox/main.py`` and uncomment these lines: .. code-block:: python # Uncomment this import line: from sandbox.apps.background_jobs.consumer import BackgroundJobConsumer # Uncomment this WebSocket route: ws_router.add_websocket_route("/backgroundjob", BackgroundJobConsumer.as_asgi()) These lines add a WebSocket endpoint at ``/ws/backgroundjob`` that handles background job processing. Testing Your Background Jobs ---------------------------- Your final project structure should now look like this: .. code-block:: text tutorial-project/ ├── docker-compose.yml └── sandbox/ ├── __init__.py ├── main.py ├── layers.py ├── tasks.py ├── worker.py ├── start_dev.py ├── apps/ │ ├── __init__.py │ ├── background_jobs/ │ │ ├── __init__.py │ │ └── consumer.py │ ├── room_chat/ │ │ ├── __init__.py │ │ └── consumer.py │ └── system_chat/ │ ├── __init__.py │ └── consumer.py └── static/ ├── css/ │ └── style.css └── js/ └── main.js 1. **Stop any running servers:** If you have a server running from previous tutorials, stop it first (Ctrl+C) to avoid port conflicts. 2. **Start the development environment:** .. code-block:: bash # Use the development launcher (recommended) python sandbox/start_dev.py # OR start components separately: # Terminal 1: python sandbox/worker.py # Terminal 2: uvicorn sandbox.main:app --reload --port 8080 3. **Test the background job processing:** Visit http://localhost:8080 in your browser. You should see the "Background Job Processing" section. 4. **Try different job types:** - Select "Default Processing" and send a message - Select "Translation" and try words like "hello", "world", "thank you" - Select "Text Analysis" and send any text to get detailed analysis - Select "AI Generation" and mention topics like "weather", "food", or "help" **Expected Behavior:** - Immediate JSON confirmation that job was queued with status indicators - Job ID display for tracking (shown in parentheses) - Real-time result delivery when processing completes with ✅ indicator - Different processing times for different job types (1-4 seconds) - Structured JSON responses for better client-side handling .. image:: ../_static/background-jobs.png :alt: Background Jobs demo showing asynchronous processing with real-time updates :align: center Customizing Your Background Jobs -------------------------------- The system includes several areas for customization: **1. Adding New Job Types:** .. code-block:: python # In sandbox/tasks.py, add new job function: def process_image(job_id: str, image_data: str, channel_name: str) -> dict: # Your image processing logic here time.sleep(5) # Simulate processing result = f"🖼️ Processed image: {len(image_data)} bytes" _send_result_to_client(channel_name, result) return {"status": "completed", "result": result} # Add to JOB_FUNCTIONS dictionary: JOB_FUNCTIONS = { "translate": translate_text, "analyze": analyze_text, "generate": generate_response, "image": process_image, # New job type "default": process_default, } **2. Custom Job Processing Logic:** .. code-block:: python # In consumer.py, add preprocessing: async def receive_json(self, content: dict[str, Any], **kwargs: Any) -> None: # Add validation if not content.get("content") or not content.get("content", "").strip(): await self.send_json({"status": "error", "message": "❌ Empty message not allowed"}) return # Add rate limiting if hasattr(self, 'last_job_time'): if time.time() - self.last_job_time < 1: # 1 second cooldown await self.send_json({"status": "error", "message": "⏳ Please wait before sending another job"}) return self.last_job_time = time.time() # Continue with normal processing... **3. Alternative Queue Systems:** Instead of RQ, you can use other systems: .. code-block:: python # Celery example: from celery import Celery app = Celery('tasks', broker='redis://localhost:6379') @app.task def process_with_celery(content, channel_name): # Your processing logic result = f"Processed by Celery: {content}" _send_result_to_client(channel_name, result) return result # In consumer: def queue_job(job_type, content, channel_name): process_with_celery.delay(content, channel_name) **4. Enhanced Result Formatting:** .. code-block:: python # In tasks.py, customize result delivery: def _send_result_to_client(channel_name: str, message: str): timestamp = datetime.datetime.now().strftime("%H:%M:%S") formatted_message = f"[{timestamp}] {message}" try: channel_layer = get_channel_layer("chat") async_to_sync(channel_layer.send)( channel_name, { "type": "job_result", "message": formatted_message, "timestamp": timestamp } ) except Exception as e: print(f"Error sending result: {e}") Troubleshooting --------------- **Jobs Not Processing:** Make sure the RQ worker is running: check the terminal output from ``python sandbox/start_dev.py`` **Results Not Appearing:** Verify Redis is running: ``docker compose ps`` should show Redis container **Import Errors:** Ensure both import and route lines are uncommented in ``sandbox/main.py`` **Worker Connection Failed:** Check Redis connection - worker uses port 6379 (standard), channels use 6399 **Slow Job Processing:** This is intentional - jobs have artificial delays (1-4 seconds) to demonstrate async processing What's Next? ------------ Excellent! You've built a complete background job processing system with real-time WebSocket communication. You now understand: ✅ How to offload long-running tasks from WebSocket connections ✅ Background job queuing with Redis Queue (RQ) ✅ Cross-process communication using channel layers ✅ Real-time result delivery to specific clients ✅ Integration of sync workers with async WebSocket consumers ✅ Development workflow with coordinated server and worker processes This pattern is essential for building scalable real-time applications that handle intensive processing without blocking user interactions. In the final section, we'll explore advanced channel layer patterns and external integrations. Continue to :doc:`showcase` to learn advanced channel layer patterns and external integrations.