Source code for celeryviz.server

from typing import List, Optional
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
import asyncio
from uvicorn import Config, Server as UvicornServer
import logging
from fastapi import FastAPI
from celeryviz.data_service import AbstractEventSink, SocketioEventSink
from celeryviz.constants import DEFAULT_PORT

banner_template = f"""
==================================
        🎉 App Launched!
==================================
🌐 URL: http://localhost:%d/app/
==================================
"""

import logging
from .config import settings

logger = logging.getLogger(__name__)

[docs] class Server: def __init__(self, loop: asyncio.AbstractEventLoop, port: int = None, event_data_sinks: Optional[List[AbstractEventSink]] = None): self.app = FastAPI() self.loop = loop self.port = port or settings.port self.event_data_sinks = event_data_sinks or [] self._mount_socketio_app() self.app.get("/app/", response_class=HTMLResponse)(self.frontend_app) self.app.mount("/", StaticFiles(directory="celeryviz/static"), name="static") def _mount_socketio_app(self): socketio_sink = next((sink for sink in self.event_data_sinks if isinstance(sink, SocketioEventSink)), None) if socketio_sink: self.app.mount("/socket.io", socketio_sink.socket_app)
[docs] def frontend_app(self): return HTMLResponse(content=open("celeryviz/static/index.html").read(), status_code=200)
[docs] async def event_handler(self, data): """ The `dump_events` if called for each event received for now. This can be optimized to batch process events in future. """ sink_datadump_futures = [sink.dump_events([data]) for sink in self.event_data_sinks] await asyncio.gather(*sink_datadump_futures)
[docs] def start(self): banner = banner_template % self.port logger.info(banner) config = Config(app=self.app, host='0.0.0.0', port=self.port) server = UvicornServer(config=config) self.loop.run_until_complete(server.serve())