Коли хочеш конвертувати mp4 у wav, а виходить SOLID

Знаєте, нині я все частіше зустрічаю ідею «не важливо, як ти напишеш код, як побудуєш архітектуру всередині додатка, не треба робити все надто добре, головне – результат». В деяких місцях (MVP, як варіант) така позиція має сенс, але дана стаття явно не про неї і явно не для тих, хто звик отримувати задоволення виключно від результату, а не від процесу 🙂

Ніякого знущання, просто мем в тему

Нещодавно я завершила створення одного з мікросервісів в рамках свого проекту. Якщо чесно, не думала, що ця творчість затягнеться надовго. Вже в ході розробки мені стало ясно: є можливість зробити не «абияк», а таким чином, щоб написаний сервіс лишився зі мною «на роки», на всі випадки життя та демонстрував зростання, а не тільки результат, який юзається для конкретної задачі і на цьому все.

Спочатку мікросервіс мав конвертувати відео в аудіо – виключно одна нескладна задача в рамках великого проекту з мікросервісною архітектурою. Але в певний момент я подумала, чому б не подивитися на це завдання ширше, якщо є така можливість? Досить швидко ідея виросла до більших масштабів і перетворилася на перспективний міні-проект, який дозволяє закласти крутий фундамент для чогось нового. Напевно, я опинилася в другому стані:

Які пункти додалися до простого «отримати аудіо і піти далі»?

  • Масштабованість. Тепер в перспективі я розглядала розширення мікросервісу і додавання інших типів конвертацій – «pdf у txt», «word у pdf», «png у jpg» тощо. Мені хотілося написати так, щоб він міг використовуватися не лише в одному проекті…
  • SOLID принципи.  Саме ті принципи, що дозволяють створити архітектуру «на роки». Архітектуру, яка не вимагає постійних змін, а може легко масштабуватись без «я додав код і все впало».
  • Гнучкість. Даний мікросервіс може бути використаний ще у дуже багатьох проектах, які коли-небудь доведеться робити, і при цьому не потрібно буде в нього вносити суттєві зміни (а може і взагалі діло обійдеться без змін).

В результаті, мені знадобилося близько 25-ти файлів… «Ти божевільна!» – скажете ви. Проте, насправді, лякає тільки цифра. Важливим було розподілити все так, щоб кожен файл відповідав тільки за один процес (SRP ж ніхто не відміняв). Для маленьких проектів це не обов’язково, але уявіть, що проект виріс. Що тоді?

Нижче я розкажу про деякі важливі моменти зі свого міні-проекту, так сказати, але повну версію ви можете глянути тут

File Manager

Оскільки на початку не було остаточно відомо, де саме доведеться розгортати мій проект, я зрозуміла, що працювати з вхідним і вихідним файлом потрібно залежно від ситуації. Тому список функцій, які створюють та зберігають файли, було вирішено винести в окремий клас LocalStorageFileManager і в принципі зробити все так, щоб в майбутньому, коли доведеться працювати з S3, FTP / SFTP тощо, можна було просто додати новий клас і піти далі, а не сидіти копатися в коді, вносячи зміни. Тому діло не обійшлося без базового класу, на який мали спиратися всі дочірні:

import os
import shutil
from typing import Tuple, Optional, List
from typing_extensions import Literal
import uuid
from abc import ABC, abstractmethod
import platform
from app.services.error_logger import log_exceptions
from app.exceptions.exceptions import AppError


class FileManager(ABC):

    def __init__(self, settings):
        self._settings = settings

    @abstractmethod
    def get_tmp_dir(self, type: Literal["input", "output"]) -> str:

        """Get or create a temporary directory."""

        pass
        
    @abstractmethod
    def create_tmp_file_path(self, extension: str, type: Literal["input", "output"], filename = None) -> Tuple[str, str]:
        """
        Returns:
            Tuple[tmp_path, filename (without extension)]
        """
        pass

    @abstractmethod
    def save(self, filepath: str, filename: str) -> str:

        """Save to the folder used for serving the file to a user."""

        pass
        
    @abstractmethod
    def clear_tmp_folders(self, folders: Optional[List[str]] = None):

        """Clear temporary folders after processing."""

        pass

    @abstractmethod
    def get_ffmpeg_path(self) -> str:

        """Additional exe files for the conversions"""

        pass


class LocalStorageFileManager(FileManager):

    """
    The class for managing files on local storage (e.g., D:/).
    """

    def __init__(self, settings):
        super().__init__(settings)

    @log_exceptions
    def get_tmp_dir(self, type: Literal["input", "output"]) -> str:
        workdir = ''
        if type == "input":
            workdir = self._settings.TEMP_DIR_INPUT
        if type == "output":
            workdir = self._settings.TEMP_DIR_OUTPUT
        if not type in ["input", "output"]:
            raise AppError("WRONG_TYPE", type = type)
        os.makedirs(workdir, exist_ok=True) #create workdir
        return workdir
        
    def create_tmp_file_path(self, extension: str, type: Literal["input", "output"], filename = None) -> Tuple[str, str]:
        """
        Returns:
            Tuple[tmp_path, filename (without extension)]
        """
        workdir = self.get_tmp_dir(type)
        if filename is None:
            filename = f"{uuid.uuid4()}"
        tmp_path = os.path.join(workdir, f"{filename}.{extension}")
        return tmp_path, filename  

    @log_exceptions
    def save(self, filepath: str, filename: str) -> str:
        media_folder = self._settings.MEDIA_FOLDER
        os.makedirs(media_folder, exist_ok=True) 
        target_path = os.path.join(media_folder, filename)
        shutil.copy(filepath, target_path)
        return f"{self._settings.MEDIA_DIR.rstrip('/')}/{filename}"
        

    def clear_tmp_folders(self):
        for filename in os.listdir(self._settings.TEMP_DIR_OUTPUT):
            filepath = os.path.join(self._settings.TEMP_DIR_OUTPUT, filename)
            try:
                if os.path.isfile(filepath) or os.path.islink(filepath):  #if it is a file or a symlink
                    os.remove(filepath)
                elif os.path.isdir(filepath):  #if it is a directory
                    shutil.rmtree(filepath)
            except Exception as e:
                print(f"Clearing error {filepath}: {e}")

    def get_ffmpeg_path(self) -> str:
        ffmpeg_binary = "ffmpeg.exe" if platform.system() == "Windows" else "ffmpeg"
        custom_path = os.path.join(self._settings.TOOLS_FOLDER, ffmpeg_binary)

        if os.path.exists(custom_path):
            return custom_path
        else:
            return ffmpeg_binary  # use system ffmpeg
    

file_manager_registry = {
    "local": LocalStorageFileManager
}

@log_exceptions
def get_file_manager(settings) -> FileManager:
    manager_class = file_manager_registry.get(settings.STORAGE_TYPE)
    if not manager_class:
        raise AppError("FILE_MANAGER_NOT_IMPLEMENTED", type=settings.STORAGE_TYPE)
    return manager_class(settings)

Convertor

Ключовий модуль мікросервісу також довелося розширити. І, мені здається, тут ідею пояснювати не обов’язково, оскільки вище я казала, що в перспективі цей мікросервіс може почати виконувати більше різноманітних конвертацій. Саме тому я також створила базовий і дочірній класи:

from abc import ABC, abstractmethod
import magic
from pydub import AudioSegment
from tqdm import tqdm
from app.core.file_manager import get_file_manager
import os
from app.services.error_logger import log_exceptions
from app.exceptions.exceptions import AppError


class Convertor(ABC):

    def __init__(self, settings, file_manager=None):
        self.file_manager = file_manager or get_file_manager(settings)
        self._settings = settings

    @classmethod
    @abstractmethod
    def supports(cls):

        """List of supported (from, to) extension pairs."""

        pass

    @abstractmethod
    def convert(self, input_path: str, extension_from: str, filename: str, extension_to: str) -> str:
        """
            Returns:
            str: Full path to the converted output file.
        """
        pass

    def precheck_file(self, path: str, expected_ext: str):

        """ Validations before the conversion."""

        if not os.path.exists(path):
            raise AppError("FILE_NOT_FOUND_PATH", path=path)
        ext = extract_extension(path)
        if ext != expected_ext:
            raise AppError("EXTENSION_MISMATCH", ext=ext, expected=expected_ext)


class ConvertVideoAudio(Convertor):

    """Converts video to audio"""

    def __init__(self, settings, file_manager=None):
        super().__init__(settings, file_manager)

    @classmethod
    def supports(cls):
        return [
            ("mp4", "mp3"),
            ("mp3", "wav"),
            ("mp4", "wav"),
            ("wav", "mp3"),
        ]

    @log_exceptions
    def convert(self, input_path: str, extension_from: str, filename: str, extension_to: str) -> str:
 
        self.precheck_file(path = input_path, expected_ext=extension_from)
        SHOW_PROGRESS = self._settings.DEBUG_PROGRESS.lower() == "true"

        self.check_mediafile(input_path)
        (output_path, _) = self.file_manager.create_tmp_file_path(extension_to, type="output", filename=filename)
        chunk_size = 10_000
        audio = AudioSegment.from_file(input_path)
        AudioSegment.converter = self.file_manager.get_ffmpeg_path()

        # General duration
        total_duration = len(audio)

        # Init empty object
        processed_audio = AudioSegment.empty()

        # For showing progress on backend (for debug only)
        iterator = range(0, total_duration, chunk_size)
        if SHOW_PROGRESS:
            iterator = tqdm(iterator, desc="Convertation", unit="chunk")

        # Process video by parts
        for start in iterator:
            end = min(start + chunk_size, total_duration)
            chunk = audio[start:end]  
            processed_audio += chunk  # Cut a part and add to the final audio object

        # export to the file
        with open(output_path, "wb") as f:
            processed_audio.export(f, format=extension_to)
        if SHOW_PROGRESS:
            print("\n Finished:", output_path)
        return output_path
        
    @log_exceptions
    def check_mediafile(self, path: str):

        """check whether the file is audio or video"""

        mime = magic.from_file(path, mime=True)       
        if not (mime.startswith("audio/") or mime.startswith("video/")):
            raise AppError("FILE_WRONG_TYPE", path = path)
        

@log_exceptions
def get_convertator(extension_from: str, extension_to: str, settings) -> Convertor:
    from app.core.conversion_registry import ConversionRegistry
    convertor_cls = ConversionRegistry.get_convertor(extension_from, extension_to)
    if not convertor_cls:
        raise AppError("WRONG_CONVERSION", from_ext = extension_from, to_ext = extension_to)
    return convertor_cls(settings=settings)


def extract_extension(path: str) -> str:

    """extract extension from the provided path"""

    ext = os.path.splitext(path)[1]
    ext = ext.lstrip(".")
    if ext == "":
        raise AppError("INVALID_FILE_PATH", path = path)
    return ext

Conversion Registry

Уявіть, ви пишете універсальний конвертор і у вас вже є досить чималий список дозволених конвертацій. Не знаю, як ви, однак я волію зберігати такий список в одному місці, де зможу додавати нові операції без додавання їх ще десь. Саме тому виникла ідея створити цей регістр. До речі, саме так ми забезпечуємо виконання одного з SOLID принципів, а саме Open/Closed (OCP) принцип, коли модуль має бути відкритий для розширення, але не для модифікації:

from app.core.convertor import Convertor
from typing import Type

class ConversionRegistry:
    _mapping = None

    @classmethod
    def get_mapping(cls) -> dict:
        """
        Gets a dict of all conversions that are supported by Convertor classes. 
        {
            ("mp4", "mp3"): ConvertVideoAudio,
            ("pdf", "txt"): ConvertPDF,
            ...
        }

        """
        if cls._mapping is None:
            cls._mapping = {}

        for convclass in Convertor.__subclasses__():
            for pair in convclass.supports():
                cls._mapping[pair] = convclass

        return cls._mapping
    

    @classmethod
    def get_convertor(cls, from_ext: str, to_ext: str) -> Type[Convertor]:
        """
        Gets class for specified pair
        """
        mapping = cls.get_mapping()
        return mapping.get((from_ext.lower(), to_ext.lower()))
    

    @classmethod
    def get_allowed_conversions_list(cls) -> list:
        """
        [
            ("mp3", "wav"),
            ("mp4", "mp3"),
            ...
        ]
        """
        return sorted(cls.get_mapping().keys())
    
    @classmethod
    def get_supported_extension_set(cls) -> set:
        return {ext for pair in cls.get_mapping() for ext in pair}

На цьому можна було б завершувати, однак мова йде про окремий мікросервіс, що повинен приймати вхідний файл і повертати вже конвертований. Очевидно, до вище написаного коду довелося додати ендпоїнти і весь необхідний інший код для створення API, обробку помилок, документацію іііі, звичайно ж, логер.

Логер та обробка помилок

Очевидно, можна було користуватися готовими рішеннями для таких речей. Проте в результаті логи і помилки виглядали явно не так, як мені потрібно: то чогось не вистачало, то формат не той, то інформація не вся.

В перспективі цей проект матиме окремий (скоріш за все, теж універсальний) мікросервіс для зберігання логів. А тому я писала код таким чином, щоб потім можна було викликати сервіс для логів і писати все туди. Перед таким викликом, очевидно, логи потрібно зібрати і сформувати json, тому з’явився цей файл:

import traceback
import uuid
import platform
from datetime import datetime
from flask import request, has_request_context
import json
import functools
import os

def log_exception(exc: Exception, func: callable = None, http_status: int = None, http_message: str = None):
    DISABLE_LOGGING = os.getenv("DISABLE_ERROR_LOGGING") == "1"
    if not DISABLE_LOGGING:
        error_dict = get_error_log_dict(exc, func, http_status, http_message)

        # for displaying in the terminal
        print("LOGGED ERROR:")   
        print(json.dumps(error_dict, indent=2, ensure_ascii=False))

def get_error_log_dict(exc: Exception, func: callable = None, http_status: int = None, http_message: str = None) -> dict:
    error_id = str(uuid.uuid4())

    # Get and truncate the traceback
    full_traceback = ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__))
    short_traceback = full_traceback[-3000:] if len(full_traceback) > 3000 else full_traceback

    # Get info about the original error 
    origin_info = {}
    tb = traceback.extract_tb(exc.__traceback__)
    if tb:
        origin_frame = tb[-1]
        origin_info = {
            "origin_file": origin_frame.filename,
            "origin_function": origin_frame.name,
            "origin_line": origin_frame.lineno
        }

    # The location where log_exception was called
    log_context = {}
    if func:
        log_context["module"] = func.__module__
        log_context["function"] = func.__name__
        if hasattr(func, '__self__') and hasattr(func.__self__, '__class__'):
            log_context["class"] = func.__self__.__class__.__name__

    # Create a JSON structure
    log_data = {
        "id": error_id,
        "timestamp": datetime.utcnow().isoformat(),
        "error_type": type(exc).__name__,
        "error_message": str(exc),
        "traceback": short_traceback,
        "system": {
            "platform": platform.platform(),
            "python_version": platform.python_version(),
        },
        **origin_info,
        **log_context,
    }

    # HTTP part
    if http_status:
        log_data["http_status"] = http_status
    if http_message:
        log_data["http_error_message"] = http_message

    # Data about the HTTP request (if any)
    if has_request_context():
        try:
            json_data = None
            try:
                json_data = request.get_json(silent=True)
            except Exception as e:
                json_data = f"<< Failed to parse JSON: {e} >>"

            log_data["request"] = {
                "method": request.method,
                "url": request.url,
                "remote_addr": request.remote_addr,
                "user_agent": request.headers.get("User-Agent"),
                "json": json_data
            }

        except Exception as e:
            log_data["request"] = f"<< Failed to collect request data: {e} >>"

    return log_data



def log_exceptions(func):
    @functools.wraps(func)
    def sync_wrapper(*args, **kwargs):

        """
        The decorator to apply to different functions throughout the service and gather error information.
        """

        try:
            return func(*args, **kwargs)
        except Exception as e:
            raise

    return sync_wrapper

Виключення хотілося зробити не просто зрозумілими, а й контрольованими. Для мене контрольованість – це коли все зведено в одну точку, де ми можемо вносити зміни, а також – універсальний шаблон, за яким працює весь додаток. Так знижується ймовірність щось загубити чи зламати. Саме тому я використала декоратор log_exceptions, який «чіпляється» до різноманітних функцій, логує виключення та дозволяє їх надалі обробити централізовано.

Формат самих виключень також був універсальним. Для цього я створила файл exceptions, який не мав надто багато коду, однак робив магію, повертаючи саме те, що мені потрібно:

from app.exceptions.errors import ERROR_MESSAGES

class AppError(Exception):
    def __init__(self, code, **kwargs):
        self.code = code
        self.message = ERROR_MESSAGES.get(code).format(**kwargs)
        super().__init__(self.message)

В файл errors я помістила довідник виключень з кастомними кодами, щоб потім у документації чітко розписати, що який код означає:

Так мікросервіс почав спілкуватися доступною мовою і повертати не «сферичного коня у вакуумі», а достатньо зрозумілі речі, що можуть бути оброблені в іншому мікросервісі і передані далі. Щоб не було, як в тому мемі:

Тестування

Не буду надто детально розповідати про тестування. Скажу лише, що тут мені знадобилося кілька папок з інтеграційними і юніт тестами, а також – усіма необхідними даними для них. Я робила все для того, щоб інтеграційні тести були універсальними для всіх конвертацій. Єдине, що доведеться додати – це дані для тестування конкретного конвертора та кілька функцій, які створюватимуть тестові файли з відповідними розширеннями.

Скажу відверто, від процесу написання додатка в такому форматі я отримала неймовірне задоволення. Здавалося, що я створюю щось добре структуроване, перспективне і масштабне, навіть якщо задача була досить простою. Звичайно, у досконалості немає меж, і, впевнена, знайдуться нюанси, які ще можна покращити. Однак для перфекціоніста важливо що? Своєчасно зупинитися і все-таки видати результат, а тому в певний момент довелося визнати цей мікросервіс завершеним 🙂