FastAPI

FastAPI with MongoDB – Build Scalable Python APIs

FastAPI with MongoDB can help many developers build efficient, modern web applications with an asynchronous and NoSQL approach.

 

 

Some of the queries for the mongodb are as below:

from bson.objectid import ObjectId
from database.mongodb import MongoDB
from app.user.models import User,UserInOut
from pydantic.json import pydantic_encoder


async def create_user(db: MongoDB, user:User) -> ObjectId:
    data_dict = user.model_dump()
    inserted_data = await db.mongo_db["Users"].insert_one(data_dict)
    return inserted_data.inserted_id


async def read_user(db: MongoDB, user_id: str):
    data = await db.mongo_db["Users"].find_one({"_id": ObjectId(user_id)})
    if data:
        return UserInOut(**data)
    return None


async def all_users(db: MongoDB) -> list[UserInOut]:
    selected_fields = {"_id": 1, "name": 1, "email": 1}
    query= {
            "$or": [
                {"deleted": False},  # not deleted
                {"deleted": {"$exists": False}},  # deleted does not exist
            ],
        },
    data = await db.mongo_db["Users"].find(query, selected_fields).to_list(length=None)
    return [
        pydantic_encoder(UserInOut(id=str(d["_id"]), name=d["name"], email=d["email"]))
        for d in data
    ]


async def update(db: MongoDB, user_id: str, user: User) -> bool:
    data_dict = user.model_dump()
    updated_data = await db.mongo_db["Users"].update_one(
        {"_id": ObjectId(user_id)}, {"$set": data_dict}
    )
    return updated_data.modified_count > 0


async def delete(db: MongoDB, user_id: str) -> bool:
    deleted_data = await db.mongo_db["Users"].delete_one(
        {"_id": ObjectId(user_id)}
    )
    return deleted_data.deleted_count > 0

 


About author

author image

Amrit panta

Fullstack developer, content creator



Scroll to Top