157 lines
4.5 KiB
Python
157 lines
4.5 KiB
Python
import asyncio
|
||
import hashlib
|
||
import hmac
|
||
from tomllib import load
|
||
|
||
import requests
|
||
from aiogram import Bot, Dispatcher
|
||
from aiogram.enums import ChatMemberStatus
|
||
from aiogram.filters import Command
|
||
from aiogram.types import Message
|
||
from tinydb import Query, TinyDB
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
import os
|
||
# ================= CONFIG =================
|
||
|
||
db = TinyDB("/storage/users.json")
|
||
Users = Query()
|
||
|
||
BOT_TOKEN = os.getenv("BOT_TOKEN")
|
||
|
||
MATRIX_SERVER = os.getenv("MATRIX_SERVER")
|
||
SHARED_SECRET = os.getenv("SHARED_SECRET")
|
||
MAX_MATRIX_ACCOUNTS = int(os.getenv("MAX_MATRIX_ACCOUNTS"))
|
||
# я хотел еще сделать лимит на подписчиков гринвещает и моих котов но стало лень
|
||
REQUIRED_CHANNELS = [
|
||
-1002308592767, # channel 1 ID
|
||
-1002032804724, # channel 2 ID
|
||
]
|
||
|
||
# =========================================
|
||
|
||
|
||
def can_create_matrix_account(telegram_id: int) -> bool:
|
||
user = db.get(Users.telegram_id == telegram_id)
|
||
if not user:
|
||
return True
|
||
return len(user.get("matrix_accounts", [])) < MAX_MATRIX_ACCOUNTS
|
||
|
||
|
||
def add_matrix_account(telegram_id: int, username: str):
|
||
user = db.get(Users.telegram_id == telegram_id)
|
||
|
||
if user:
|
||
accounts = user.get("matrix_accounts", [])
|
||
accounts.append(username)
|
||
db.update({"matrix_accounts": accounts}, Users.telegram_id == telegram_id)
|
||
else:
|
||
db.insert({"telegram_id": telegram_id, "matrix_accounts": [username]})
|
||
|
||
|
||
def generate_mac(nonce: str, username: str, password: str, admin: bool = False) -> str:
|
||
msg = f"{nonce}\0{username}\0{password}\0{'admin' if admin else 'notadmin'}"
|
||
return hmac.new(SHARED_SECRET.encode(), msg.encode(), hashlib.sha1).hexdigest()
|
||
|
||
|
||
async def is_subscribed(bot: Bot, user_id: int) -> bool:
|
||
"""
|
||
Returns True if user is member of AT LEAST ONE required channel
|
||
"""
|
||
for channel_id in REQUIRED_CHANNELS:
|
||
try:
|
||
member = await bot.get_chat_member(channel_id, user_id)
|
||
if member.status in (
|
||
ChatMemberStatus.MEMBER,
|
||
ChatMemberStatus.ADMINISTRATOR,
|
||
ChatMemberStatus.OWNER,
|
||
):
|
||
return True
|
||
except Exception:
|
||
continue
|
||
return False
|
||
|
||
|
||
async def create_user(message: Message, bot: Bot):
|
||
telegram_id = message.from_user.id
|
||
|
||
# 🔒 limit check
|
||
if not can_create_matrix_account(telegram_id):
|
||
await message.answer(
|
||
"🚫 Limit reached!\n\n"
|
||
"You can create **up to 3 Matrix accounts** per Telegram account."
|
||
)
|
||
return
|
||
|
||
# 2️⃣ parse args
|
||
args = message.text.split(maxsplit=2)
|
||
if len(args) != 3:
|
||
await message.answer("Usage:\n" "/create username password")
|
||
return
|
||
|
||
username, password = args[1], args[2]
|
||
|
||
try:
|
||
# 3️⃣ get nonce
|
||
nonce_resp = requests.get(
|
||
f"{MATRIX_SERVER}/_synapse/admin/v1/register", timeout=10
|
||
)
|
||
nonce_resp.raise_for_status()
|
||
nonce = nonce_resp.json()["nonce"]
|
||
|
||
# 4️⃣ generate MAC
|
||
mac = generate_mac(nonce, username, password)
|
||
|
||
# 5️⃣ register user
|
||
payload = {
|
||
"nonce": nonce,
|
||
"username": username,
|
||
"password": password,
|
||
"mac": mac,
|
||
"admin": False,
|
||
}
|
||
|
||
r = requests.post(
|
||
f"{MATRIX_SERVER}/_synapse/admin/v1/register", json=payload, timeout=10
|
||
)
|
||
|
||
if r.status_code == 200:
|
||
add_matrix_account(message.from_user.id, username)
|
||
domain = MATRIX_SERVER.replace("https://", "").replace("http://", "")
|
||
await message.answer(
|
||
"✅ Matrix account created!\n\n" f"👤 User: @{username}:{domain}"
|
||
)
|
||
await bot.send_message(
|
||
-5087229310,
|
||
f"""matrix [user](tg://user?id={message.from_user.id}) {message.from_user.username} is created""",
|
||
parse_mode="markdown",
|
||
)
|
||
else:
|
||
await message.answer("❌ Failed to create account\n\n" f"{r.text}")
|
||
|
||
except Exception as e:
|
||
await message.answer(f"⚠️ Error:\n{e}")
|
||
|
||
|
||
async def start(message: Message):
|
||
await message.answer(
|
||
"👋 Welcome!\n\n"
|
||
"To create a Matrix account:\n"
|
||
"`/create username password`\n\n"
|
||
)
|
||
|
||
|
||
async def main():
|
||
bot = Bot(BOT_TOKEN)
|
||
dp = Dispatcher()
|
||
|
||
dp.message.register(start, Command("start"))
|
||
dp.message.register(create_user, Command("create"))
|
||
|
||
await dp.start_polling(bot)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|