Source code for celeryviz.data_service.file_event_sink

import json
import logging
import aiofiles
from typing import List
from celeryviz.data_service.base import AbstractEventSink

logger = logging.getLogger(__name__)


[docs] class FileEventSink(AbstractEventSink): """File-based implementation of the AbstractEventSink.""" def __init__(self, file_path: str): if file_path.endswith('.jsonl') or file_path.endswith('.ndjson'): self.file_path = file_path logger.info(f"Initialized event storage with file: {file_path}") else: raise ValueError("File extension must be .jsonl or .ndjson")
[docs] async def dump_events(self, events: List[dict]): """Dump a list of event dictionaries to a file.""" json_lines = [json.dumps(event) for event in events] write_data = "\n".join(json_lines) + "\n" async with aiofiles.open(self.file_path, "a") as f: await f.write(write_data)