#StackBounty: #python #mongodb #mocking #pytest #python-asyncio Testing for MongoDB Functionality using Motor AsyncIO and Pytest

Bounty: 50

So I am trying to write several tests to test my functions that use an async MongoDB connection. To connect to MongoDB I use Motor with asyncio. I need help with mocking the Motor connection.

My Code:

commons.py

mongo = None

blacklist.py

import commons

class Blacklist(object):
    async def check_if_blacklisted(self, word: str):
        blacklisted = False
        if await commons.mongo.blacklist.find_one({'word': word}):
            blacklisted = True
        return blacklisted

main.py

import asyncio
from blacklist import Blacklist
from motor.motor_asyncio import AsyncIOMotorClient

async def run():
    commons.mongo = AsyncIOMotorClient("mongodb://localhost", io_loop=asyncio.get_event_loop())
    blacklist_checker = Blacklist()
    result = await blacklist_checker.check_if_blacklisted(word="should_be_false")
    print(result)
    # > False

    result = await blacklist_checker.check_if_blacklisted(word="should_be_true")
    print(result)
    # > True

loop = asyncio.get_even_loop()
loop.run_until_complete(run())
loop.close()

I now want to test blacklist.py by mocking the Motor Connection but I cannot seem to get the test running properly. Here are the codes that I’ve tried:

test_blacklist.py

import pytest
from blacklist import Blacklist

class TestBlacklist(object):

@pytest.fixture
async def motor(self, event_loop):
    # I know I'm not mocking the Motor Connection here, 
    # but just wanted to show you the output using this fixture.
    commons.mongo = motor.motor_asyncio.AsyncIOMotorClient(io_loop=event_loop)
    yield commons.mongo
    commons.mongo.close()

@pytest.mark.asyncio
async def test_check_if_blacklisted(self):
    blacklist_checker = Blacklist()
    blacklisted = await blacklist_checker.check_if_blacklisted(word="should_be_false")
    assert blacklisted == False
    # > AttributeError: 'NoneType' object has no attribute 'blacklist'

pytest-mongodb:

import pytest
from unittest.mock import patch
from blacklist import Blacklist

class TestBlacklist(object):

@pytest.mark.asyncio
async def test_check_if_blacklisted(self, mongodb):
    with patch("blacklist.commons.mongo") as db:
        db = mongodb
        blacklist_checker = Blacklist()
        blacklisted = await blacklist_checker.check_if_blacklisted(word="should_be_false")
    assert blacklisted == False
    # > TypeError: object MagicMock can't be used in 'await' expression

I tried searching online but I could not find a proper thread which would help me to perform the test while mocking the Motor connection which is async. Moreover, if you think that the direction I’m heading into for testing isn’t right, kindly let me know since I am new to writing tests, especially with async db connections.

Note: blacklist.py has various functions that require MongoDB functionality so it would be great if in my test_blacklist.py I could just initialize commons.db once and all the subsequent tests use that.


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.