It seems there is something wrong with your internet connection. Please connect to the internet and start browsing again.
ReloadWhen turned on automatically changes
the theme color on reload.
When turned on automatically changes
the theme color every 5 sec.
MongoDB is a NoSQL database commonly used for Telegram bots to store user data and configurations. It offers a flexible schema and efficient querying, making it ideal for dynamic data. With libraries like pymongo, developers can easily integrate it into their bots.
Follow the below steps to add MongoDB database in any repository
1. Add the code given below in your "requirements.txt" file
motor
DB_URI = os.environ.get("DB_URI", "")
DB_NAME = os.environ.get("DB_NAME", "TechifyBots")
LOG_CHANNEL = int(os.environ.get("LOG_CHANNEL", ""))
from typing import Any
from config import DB_URI, DB_NAME
from motor import motor_asyncio
from pymongo import ReturnDocument
from pymongo.errors import DuplicateKeyError
from bson import ObjectId
client = motor_asyncio.AsyncIOMotorClient(DB_URI)
db = client[DB_NAME]
class Techifybots:
def __init__(self):
self.users = db["users"]
self.cache: dict[int, dict[str, Any]] = {}
async def add_user(self, user_id: int, name: str) -> dict[str, Any] | None:
try:
user = {"user_id": int(user_id), "name": name}
saved = await self.users.find_one_and_update(
{"user_id": user["user_id"]},
{"$set": user},
upsert=True,
return_document=ReturnDocument.AFTER
)
if saved:
self.cache[user["user_id"]] = saved
return saved
except DuplicateKeyError:
existing = await self.users.find_one({"user_id": int(user_id)})
if existing:
self.cache[int(user_id)] = existing
return existing
except Exception as e:
print("Error in add_user:", e)
return None
async def get_user(self, user_id: int) -> dict[str, Any] | None:
try:
if user_id in self.cache:
return self.cache[user_id]
user = await self.users.find_one({"user_id": int(user_id)})
if user:
self.cache[user_id] = user
return user
except Exception as e:
print("Error in get_user:", e)
return None
async def get_all_users(self) -> list[dict[str, Any]]:
try:
users: list[dict[str, Any]] = []
async for user in self.users.find():
users.append(user)
return users
except Exception as e:
print("Error in get_all_users:", e)
return []
async def delete_user(self, identifier: int | str | ObjectId) -> bool:
try:
query = {}
if isinstance(identifier, int):
query = {"user_id": identifier}
self.cache.pop(identifier, None)
elif isinstance(identifier, (str, ObjectId)):
query = {"_id": ObjectId(identifier)} if isinstance(identifier, str) else {"_id": identifier}
doc = await self.users.find_one(query)
if doc and "user_id" in doc:
self.cache.pop(int(doc["user_id"]), None)
else:
return False
result = await self.users.delete_one(query)
return result.deleted_count > 0
except Exception as e:
print("Error in delete_user:", e)
return False
tb = Techifybots()
if await tb.get_user(message.from_user.id) is None:
await tb.add_user(message.from_user.id, message.from_user.first_name)
await client.send_message(LOG_CHANNEL, text="#New_User\n\nUser: {}\nID: {}".format(message.from_user.mention, message.from_user.id))
Copying the post and using it without permission is strictly prohibited.