Source code for celeryviz.data_service.base

from typing import List


[docs] class AbstractEventSink: """Abstract service for dumping event data."""
[docs] async def dump_events(self, events: List[dict]): """Dump a list of event dictionaries to the data store.""" raise NotImplementedError("Subclasses must implement dump_events method.")
[docs] class AbstractEventRetriever: """ Abstract service for querying event data. Not in use currently, will be used when we start sending stored events to clients. """
[docs] async def fetch_events(self, *args, **kwargs) -> List[dict]: """Receive a list of event dictionaries from the data source.""" raise NotImplementedError("Subclasses must implement fetch_events method.")