Please Subscribe To My YouTube Channel Subscribe Now!

How to add MongoDB database in any Repo

Please wait 0 seconds...
Scroll Down and click on Go to Link for destination
Congrats! Link is Generated

MongoDB

MongoDB is a NoSQL database commonly used for Telegram bots to store user data and configurations. It offers a flexible schema and efficient querying, making it ideal for dynamic data. With libraries like pymongo, developers can easily integrate it into their bots.

Follow the below steps to add MongoDB database in any repository

1. Add the code given below in your "requirements.txt" file

motor
2. Now Enter the following Variables in your "config.py" file. File name may be differ from repo to repo.
DB_URI = os.environ.get("DB_URI", "")
DB_NAME = os.environ.get("DB_NAME", "TechifyBots")

LOG_CHANNEL = int(os.environ.get("LOG_CHANNEL", ""))
3. Create a new file named "database.py" and enter the code given below
from typing import Any
from config import DB_URI, DB_NAME
from motor import motor_asyncio

client: motor_asyncio.AsyncIOMotorClient[Any] = motor_asyncio.AsyncIOMotorClient(DB_URI)
db = client[DB_NAME]

class Techifybots:
    def __init__(self):
        self.users = db["users"]
        self.cache: dict[int, dict[str, Any]] = {}

    async def add_user(self, user_id: int, name: str) -> dict[str, Any] | None:
        try:
            user: dict[str, Any] = {"user_id": user_id, "name": name}
            await self.users.insert_one(user)
            self.cache[user_id] = user      
            return user
        except Exception as e:
            print("Error in addUser: ", e)

    async def get_user(self, user_id: int) -> dict[str, Any] | None:
        try:
            if user_id in self.cache:
                return self.cache[user_id]
            user = await self.users.find_one({"user_id": user_id})
            return user
        except Exception as e:
            print("Error in getUser: ", e)
            return None

    async def get_all_users(self) -> list[dict[str, Any]]:
        try:
            users: list[dict[str, Any]] = []
            async for user in self.users.find():
                users.append(user)
            return users
        except Exception as e:
            print("Error in getAllUsers: ", e)
            return []

    async def delete_user(self, user_id: int) -> bool:
        try:
            result = await self.users.delete_one({"user_id": user_id})
            self.cache.pop(user_id, None)
            return result.deleted_count > 0
        except Exception as e:
            print("Error in delete_user: ", e)
            return False

tb = Techifybots()
4. Now add this code in your start command and don't forget to import necessary modules
if await tb.get_user(message.from_user.id) is None:
        await tb.add_user(message.from_user.id, message.from_user.first_name)
        await client.send_message(LOG_CHANNEL, text="#New_User\n\nUser: {}\nID: {}".format(message.from_user.mention, message.from_user.id))

Important

If you still face any issue then click on the below button

Copying the post and using it without permission is strictly prohibited.

Post a Comment

Cookie Consent
We serve cookies on this site to analyze traffic, remember your preferences, and optimize your experience.
Oops!
It seems there is something wrong with your internet connection. Please connect to the internet and start browsing again.
AdBlock Detected!
We have detected that you are using adblocking plugin in your browser.
The revenue we earn by the advertisements is used to manage this website, we request you to whitelist our website in your adblocking plugin.
Site is Blocked
Sorry! This site is not available in your country.