Quart, Redis, MySQL і Telegram: інсайти зі створення асинхронного API

Нещодавно переді мною стояла задача створити АПІ, яке генеруватиме рандомні тести з граматики англійської мови в залежності від вказаного рівня. Вирішила поділитися з вами своїми враженнями та думками щодо реалізації даного проекту, оскільки під час девелопменту постійно поривалася комусь описати свої рішення, знахідки, усвідомлення та висновки. Якщо такий порив є, то значить треба випускати.   

Як я вже сказала, АПІ мало повертати тести з англійської мови. Здавалося б, задача суперпроста і коротка…. Але, якщо придивитися, то можна побачити кілька важливих моментів.  По-перше, цілком логічно, що додаток не мав ходити в базу даних за тестами кожен раз, як тільки до нього звернуться.  Тому я використовувала додаткові інструменти для оптимізації цього процесу – наприклад, кеш. По-друге, важливим було продумати логіку генерації: тести, хоч і рандомні, однак користувачеві не сподобається, якщо вони повторюватимуться кожні 5 хвилин. По-третє, я вирішила зробити свій ерор хендлер, щоб він міг однаково працювати для всіх функцій і повертати саме такий текст помилок, який мені потрібно. Тут також застосувала декоратори, які в майбутньому можуть бути переписані для логування помилок у файл чи навіть в базу даних. Приклад:

def global_error_handler_async(func):

    """general handler for Internal Server Error logging of async functions"""

    @functools.wraps(func) 
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            log_raise_error(e, func)
    return wrapper


def log_raise_error(exception, func):

    """The function for errors detailed handling"""
    if not hasattr(exception, '_logged'): #check whether was logged already
        module_name = func.__module__
        function_name = func.__name__ 
        error_text = TxtData.ErrorArose.format(function_name, module_name, str(exception))
        try:
            logging.error(error_text)

        except Exception as e:  #if smth happend with the logger
            print(TxtData.LoggerError.format(error_text, str(e)))

        setattr(exception, '_logged', True)  
            #add a new attribute in order to mark the error as logged already. 
            #Otherwise we will see the same error for each function in the stack
        raise

Також після попередніх бекенд робіт я зрозуміла, що буквально закохана у Swagger, а тому вирішила заюзати розширення Quart-Schema:

Чому Quart???

Ви, напевно, зараз здивуєтесь, однак API я створювала за допомогою Quart. Чим керувалася, коли обирала саме цей фреймворк? Насправді, я планувала використовувати Flask і навіть використала його. Але для…. синхронної версії додатку. Коли справа доходить до асинхронності, то тут ми стикаємося з цікавими, а часом і складними челенжами.

Я сподіваюся, ви знайомі з поняттям асинхронності, але поясню. Уявіть, що якась операція займає досить багато часу (наприклад, читання з файлу чи запис у файл, читання з бази чи запис у базу тощо). Поки ця операція відбувається, наша програма може виконати якийсь інший процес, а потім повернутися і завершити її. Не думаю, що для мого проекту на початковому етапі асинхронність була конче необхідною, однак на перспективу і для досвіду я вирішила її реалізувати.

Знаючи, що вже вийшов Flask 2.0 з його новим асинхронним функціоналом, я думала скористатися саме ним. Проте один  абзац з документації

наштовхнув мене на роздуми. Перейшла за посиланням, відкрила документацію Quart і що я бачу? Фреймворк, який нагадує мені FastApi (саме з ним я починала бекенд розробку). За що люблю FastApi? За лаконічність і міцну взаємодію з Pydantic – бібліотекою, що максимально спрощує і пришвидшує роботу з різними структурами даних:

class GettedTests(BaseModel):   
    #the model for validation and processing of getted tests from the db 
    ID: Annotated[int, Field(gt=0)]
    Question:  Annotated[str, Field(min_length=3)]
    Options: list[OptionsTest]
    correct_option_id: Annotated[int, Field(ge=0)]
    explanation: Annotated[str, Field(min_length=3)]
    datetime_shown: Annotated[Union[str, datetime, None], Field(default=None)]

class ToValidateLevel(BaseModel):    #the model for input data validation in the gettests route
    Level: Annotated[str, Field(min_length=2)]


@eng_bp.route(TxtData.GetTestRoute, methods=["GET"])    #/testroutes/gettests
@validate_querystring(ToValidateLevel)
@validate_response(GettedTests, 200)
async def GetTests(query_args: ToValidateLevel):
    pass

Тобто ми створюємо моделі і одразу прописуємо їх у ендпоїнті без потреби самостійно перевіряти дані, які в цей ендпоїнт прийшли. Крім того, такі моделі можна використовувати всюди для різноманітних операцій, не прописуючи текстом назви полів, до яких звертаєшся.

Також мені довелося попрацювати з Redis. Напевно, ви розумієте, що постійно звертатися до бази за новою порцією даних — не найкраща ідея. Це створює великі навантаження  та уповільнює роботу. Уявіть, що ми матимемо тисячі користувачів, які одного прекрасного вечора вирішать засісти за вивчення англійської. Отже, як на мене, хорошою ідеєю було ходити в базу періодично, забираючи звідти тести пачками по n штук, де n – число, яке ми вказуємо у нашому файлі конфігурації. Де зберігати ці n тестів? Звичайно, у кеші. І Redis (а, якщо точніше, бібліотека aioredis) для цього підійшов якнайкраще.

Нарешті, база даних. Якщо чесно, поки мені доводилося працювати лише з двома – SQL та MySQL – тому я вирішила обрати одну з них, а саме MySQL. Для взаємодії з нею я встановила SQLAlchemy – досить велику бібліотеку, що дозволяє представляти таблиці у вигляді моделей, написаних мовою Python, уникати написання запитів до бази напряму в коді, забезпечувати сек’юрність тощо. Також використовувала Alembic, оскільки він уміє моментально створювати таблиці в базі даних, маючи «на руках» ту модель, яку ми описали. Це чудовий інструмент для міграції, який здатен зберігати версії наших змін. Ми можемо конролювати, які таблички, поля, властивості додавали чи видаляли, а також – відкатувати свої міграції у разі необхідності.

Загалом тести мали розміщуватися у двох табличках – Questions та Options. Мені здається, тут все очевидно: перша таблиця містить текст завдання, а друга – варіанти відповідей. SQLAlchemy дозволила мені подати їх у вигляді моделей:

class Questions(Base):

    """the main table with questions list. 
    datetime_shown is for reflecting the time when a question was shown to a user last time"""

    __tablename__ = 'questions'

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index= True, autoincrement=True)
    level: Mapped[Enum] = mapped_column(Enum(Levels), nullable=False)
    question: Mapped[Text] = mapped_column(String(400), nullable=False)
    correct_id: Mapped[int] = mapped_column(Integer, nullable=False)
    explanation: Mapped[Text] = mapped_column(String(500))
    datetime_shown: Mapped[DateTime] = mapped_column(DateTime, nullable=True)


class Options(Base):

    """the table with possible answers"""

    __tablename__ = 'options'

    question_id: Mapped[int] = mapped_column(Integer,  ForeignKey('questions.id'), primary_key=True)
    option_id: Mapped[int] = mapped_column(Integer, primary_key=True)
    option_text: Mapped[Text] = mapped_column(String(200), nullable=False)

    __table_args__ = (
        Index('ix_question_option', 'question_id', 'option_id', unique=True),
    )

які перетворювалися в SQL таблиці після запуску міграції.

Уявіть, ми створили цей додаток, проте стикаємося з новою цікавою задачею: уникнути повторної генерації тестів. Тоді як їх генерувати? Як знати, чи тест вже було згенеровано? Коли саме користувач його побачив? Звичайно, в певний момент повторення таки відбудеться, однак лише після того, як користувач пройде всі інші тести обраного рівня. Ми ж не хочемо відповідати на одне і те саме питання по 10 разів за вечір. Тут я зрозуміла, що можна піти двома шляхами.

1) Записувати час генерації тесту в базу даних в той момент, коли ми цей тест звідти взяли.

2) Записувати час генерації тесту в кеш в той момент, коли ми цей тест показали на фронтенді (наприклад), а потім – переносити дату в базу даних. Тут доведеться використовувати додатковий ендпоїнт, через який фронтенд передаватиме нам цей час.

Для чого нам час? Щоб сортувати дані у зворотному порядку і в першу чергу брати лише ті тести, які були згенеровані найраніше або ще не генерувалися взагалі. Серед двох описаних варіантів я обрала другий, давши фронтенду самостійно повідомляти, коли він показав тест користувачеві, однак перший шлях теж міг би спрацювати.  

Звичайно, були і складнощі…

  • Кеш періодично мав очищуватися у двох випадках: АПІ згенерувало усі його тести або користувачі давно не зверталися до АПІ. Розуміючи, що перед очищенням кешу дату генерації тесту потрібно записувати в базу даних, я вирішила додати так звану фонову задачу (background task), що мала працювати паралельно з основним додатком і своєчасно оновлювати дані в базі. Однак, перед цим, я витратила певний час у спробі знайти вже існуючі інструменти, які дозволять задати виконання певних операцій перед тим, як кеш очиститься. На жаль, те, що я знаходила, не відпрацьовувало з різних причин і додавання background task найкраще задовольнило мої потреби.

Особливо цікавим і непростим виявилося написання тестів для фонової задачі. Там довелося придумувати схему, як імітувати очищення кешу, щоб перевірити, чи дійсно дані заходять в базу. Звичайно, я «мокала» деякі параметри, оскільки інакше цей тест відпрацьовував би досить довго:

mocker.patch('config.settings.CACHE_DEFAULT_TIMEOUT', 6)
mocker.patch('config.settings.CACHE_CHECK_TIMEOUT', 3)
mocker.patch.object(redis_client_test, 'ttl', return_value=asyncio.Future())
redis_client_test.ttl.return_value.set_result(settings.CACHE_CHECK_TIMEOUT - 1)
        #replace current ttl in order to do the process quicker

І, крім того, в асинхронному варіанті потрібно було трохи погратися з циклами подій, які мали своєчасно закритися.

  • Інша непроста історія виникла, коли запускалася міграція Alembic. Як я писала вище, для реалізації проекту мені знадобилося дві таблички. Тут перед Алембіком постало складне питання: що з’явилося першим – «курка чи яйце», – оскільки поле question_id є зовнішнім ключем для таблиці Options, а correct_id – для таблиці Questions. Скажу відверто, в кінці мені довелося згенерувати файл міграції без одного зовнішнього ключа, а потім дописати код для його створення в цей файл.

Це ще не все?

Дві версії АПІ – не єдине, що було написано. Також я створила телеграм бот, який використовував все, що повертає моє АПІ:

Для його реалізації мені знадобилися бібліотеки aiogram, aiohttp, aioredis тощо. Робота з кешем велася для того, щоб на певний час запам’ятовувати ID користувачів. Це дозволяло програмі зрозуміти, чи користувач «новий» і потребує вступного слова та клавіатури зі списком рівнів, чи «старий» і може продовжувати з того моменту, на якому зупинився. 

Якщо цікаво, деталі всіх трьох проектів (синхронного, асинхронного АПІ і телеграм бота) можна знайти на моїй сторінці в гітхаб, яку я прикріпляю нижче. А якщо не цікаво – ну що ж, буває і таке 🙂  Сподіваюся, хтось з вас надихнувся на створення чогось нового, або почерпнув якусь нову ідею.

Асинхронна версія додатку – https://github.com/yahrdev/EnGram_async

Синхронна версія додатку – https://github.com/yahrdev/EnGram_sync

Телеграм бот – https://github.com/yahrdev/EnGram_bot