init here

This commit is contained in:
2025-11-26 21:32:41 +03:00
commit 33c97acade
91 changed files with 9155 additions and 0 deletions

742
checker/checker.py Normal file
View File

@@ -0,0 +1,742 @@
#!/usr/bin/python3
import errno
import hashlib
import http.client
import json
import random
import socket
import string
import sys
import traceback
PORT = 1337
TIMEOUT = 3.0
K_FIRST_NAME = "имя"
K_LAST_NAME = "фамилия"
K_MIDDLE_NAME = "отчество"
K_EDUCATION = "образование"
K_PASSWORD = "пароль"
K_CONFIRM_PASSWORD = "повтор пароля"
K_NEW_PASSWORD = "новый пароль"
K_SNILS = "снилс"
K_PASSPORT = "паспорт"
K_CLASS_NUMBER = "номер"
K_CLASS_LETTER = "буква"
K_CLASSES = "классы"
K_IDENTIFIER = "идентификатор"
K_CREATOR = "создатель"
K_USERNAME = "имя пользователя"
K_LESSONS = "уроки"
K_LESSON_DATE = "дата"
K_LESSON_TITLE = "название"
K_LESSON_HOMEWORK = "домашнее задание"
LOGIN_SUCCESS = "Рады видеть вас снова :3"
TEACHER_FIRST_NAMES = [
"Анна",
"Мария",
"Ольга",
"Елена",
"Светлана",
"Татьяна",
"Юлия",
"Вера",
"Алёна",
"Наталья",
]
TEACHER_LAST_NAMES = [
"Иванова",
"Петрова",
"Сидорова",
"Федорова",
"Кузнецова",
"Попова",
"Соколова",
"Павлова",
"Зайцева",
"Орлова",
]
TEACHER_MIDDLE_NAMES = [
"Александровна",
"Сергеевна",
"Игоревна",
"Юрьевна",
"Владимировна",
"Николаевна",
"Павловна",
"Степановна",
"Олеговна",
"Георгиевна",
]
TEACHER_EDUCATIONS = [
"Математика",
"Физика",
"Химия",
"История",
"Биология",
"География",
"Литература",
"Информатика",
]
STUDENT_FIRST_NAMES = [
"Артём",
"Борис",
"Виктория",
"Глеб",
"Дарья",
"Егор",
"Злата",
"Илья",
"Кира",
"Лев",
"Маргарита",
"Никита",
]
STUDENT_LAST_NAMES = [
"Алексеев",
"Белов",
"Васильев",
"Громов",
"Демидов",
"Ершов",
"Журавлёв",
"Зуев",
"Исаев",
"Калинин",
"Лукин",
"Миронов",
]
STUDENT_MIDDLE_NAMES = [
"Андреевич",
"Борисович",
"Владимирович",
"Геннадьевич",
"Дмитриевич",
"Евгеньевич",
"Иванович",
"Константинович",
"Львович",
"Михайлович",
]
LESSON_TOPICS = [
"Алгебра",
"Геометрия",
"Физика",
"Химия",
"Биология",
"История",
"Информатика",
"Русский язык",
]
HOMEWORK_PHRASES = [
"Решить упражнения {token} в тетради.",
"Подготовить сообщение по теме {token}.",
"Прочитать параграф {token} и сделать конспект.",
"Собрать презентацию {token}.",
]
PASSWORD_WORDS = [
"Секрет",
"Пароль",
"Шифр",
"Тайна",
"Загадка",
"Форт",
]
CLASS_LETTERS = list("АБВГД")
_ERR_HOSTDOWN = getattr(errno, "EHOSTDOWN", None)
DOWN_ERRNOS = {
value
for value in (
errno.ECONNREFUSED,
errno.EHOSTUNREACH,
errno.ENETUNREACH,
_ERR_HOSTDOWN,
)
if value is not None
}
DEBUG = True
def service_up():
print("[service is worked] - 101")
exit(101)
def service_corrupt():
print("[service is corrupt] - 102")
exit(102)
def service_mumble():
print("[service is mumble] - 103")
exit(103)
def service_down():
print("[service is down] - 104")
exit(104)
def debug(message):
if DEBUG:
print(f"[debug] {message}", file=sys.stderr)
class CheckerError(Exception):
def __init__(self, verdict, message):
super().__init__(message)
self.verdict = verdict
class ApiClient:
def __init__(self, host, port=PORT):
self.host = host
self.port = port
def request(self, method, path, *, json_body=None, headers=None):
if not path.startswith("/"):
path = "/" + path
hdrs = {} if headers is None else dict(headers)
data = None
if json_body is not None:
data = json.dumps(json_body, ensure_ascii=False).encode("utf-8")
hdrs.setdefault(
"Content-Type",
"application/json; charset=utf-8",
)
conn = http.client.HTTPConnection(
self.host,
self.port,
timeout=TIMEOUT,
)
try:
conn.request(method, path, body=data, headers=hdrs)
response = conn.getresponse()
payload = response.read()
text = payload.decode("utf-8", "ignore")
return response.status, text, dict(response.getheaders())
except socket.timeout as exc:
raise CheckerError(service_mumble, "request timeout") from exc
except OSError as exc:
err_no = exc.errno
if err_no in DOWN_ERRNOS:
raise CheckerError(
service_down,
f"connection error: {exc}",
) from exc
if err_no == errno.ECONNRESET:
raise CheckerError(
service_mumble,
f"connection reset: {exc}",
) from exc
raise CheckerError(
service_mumble,
f"network error: {exc}",
) from exc
except http.client.HTTPException as exc:
raise CheckerError(
service_mumble,
f"http error: {exc}",
) from exc
finally:
try:
conn.close()
except Exception:
pass
def make_seed(flag_id, flag):
digest = hashlib.sha256(f"{flag_id}:{flag}".encode("utf-8")).digest()
return int.from_bytes(digest, "big")
def token(rng, length):
alphabet = string.ascii_uppercase + string.digits
return "".join(rng.choice(alphabet) for _ in range(length))
def numeric(rng, length):
return "".join(rng.choice(string.digits) for _ in range(length))
def build_teacher(rng):
suffix = token(rng, 5)
first = rng.choice(TEACHER_FIRST_NAMES) + suffix
last = rng.choice(TEACHER_LAST_NAMES) + suffix
middle = rng.choice(TEACHER_MIDDLE_NAMES) + suffix
education = rng.choice(TEACHER_EDUCATIONS)
password = rng.choice(PASSWORD_WORDS) + token(rng, 5) + numeric(rng, 2)
username = f"{first}.{last}"
return {
"first": first,
"last": last,
"middle": middle,
"education": education,
"password": password,
"username": username,
}
def build_student(rng, flagged, flag):
suffix = token(rng, 3)
first = rng.choice(STUDENT_FIRST_NAMES) + suffix
last = rng.choice(STUDENT_LAST_NAMES) + suffix
middle = rng.choice(STUDENT_MIDDLE_NAMES) + suffix
password = rng.choice(PASSWORD_WORDS) + suffix + numeric(rng, 2)
snils = flag if flagged else numeric(rng, 11)
passport = "P" + numeric(rng, 8)
username = f"{first}.{last}"
return {
"first": first,
"last": last,
"middle": middle,
"password": password,
"snils": snils,
"passport": passport,
"username": username,
"is_flagged": flagged,
}
def build_lesson(rng, class_index, lesson_index):
base_day = 1 + class_index * 4 + lesson_index
day = 1 + base_day % 27
date = f"2025-09-{day:02d}"
topic = rng.choice(LESSON_TOPICS) + " " + token(rng, 2)
homework = rng.choice(HOMEWORK_PHRASES).format(token=token(rng, 3))
return {
"date": date,
"title": topic,
"homework": homework,
}
def generate_plan(flag_id, flag):
rng = random.Random(make_seed(flag_id, flag))
teacher = build_teacher(rng)
class_count = 1 + rng.randint(0, 2)
class_specs = []
total_students = 0
for _ in range(class_count):
student_count = 1 + rng.randint(0, 2)
lesson_count = 2 + rng.randint(0, 1)
class_specs.append(
{
"number": rng.randint(1, 11),
"letter": rng.choice(CLASS_LETTERS),
"student_count": student_count,
"lesson_count": lesson_count,
}
)
total_students += student_count
flagged_index = rng.randrange(total_students)
current_index = 0
classes = []
for class_idx, spec in enumerate(class_specs):
students = []
for _ in range(spec["student_count"]):
is_flagged = current_index == flagged_index
students.append(build_student(rng, is_flagged, flag))
current_index += 1
lessons = [
build_lesson(rng, class_idx, li)
for li in range(spec["lesson_count"])
]
classes.append(
{
"number": spec["number"],
"letter": spec["letter"],
"students": students,
"lessons": lessons,
}
)
return {
"teacher": teacher,
"classes": classes,
"flag": flag,
}
def all_students(plan):
for class_spec in plan["classes"]:
for student in class_spec["students"]:
yield student
def make_auth_header(teacher):
return {
"Authorization": (
f"Basic {teacher['username']} {teacher['password']}".encode(
"utf-8"
)
),
}
def parse_json_payload(text, context):
try:
return json.loads(text) if text else {}
except json.JSONDecodeError as exc:
raise CheckerError(
service_corrupt,
f"invalid json in {context}: {exc}",
) from exc
def login_teacher(client, teacher):
payload = {
K_USERNAME: teacher["username"],
K_PASSWORD: teacher["password"],
}
status, body, _ = client.request(
"POST",
"/api/users/login",
json_body=payload,
)
body_text = body.strip()
if status == 200 and body_text == LOGIN_SUCCESS:
return True
if status == 401:
return False
raise CheckerError(
service_corrupt,
f"unexpected login status {status} body={body_text!r}",
)
def ensure_teacher(client, plan):
teacher = plan["teacher"]
if login_teacher(client, teacher):
return make_auth_header(teacher)
payload = {
K_FIRST_NAME: teacher["first"],
K_LAST_NAME: teacher["last"],
K_MIDDLE_NAME: teacher["middle"],
K_EDUCATION: teacher["education"],
K_PASSWORD: teacher["password"],
K_CONFIRM_PASSWORD: teacher["password"],
}
status, _, _ = client.request("POST", "/api/users", json_body=payload)
if status != 201:
raise CheckerError(
service_corrupt,
f"teacher create failed with status {status}",
)
if not login_teacher(client, teacher):
raise CheckerError(
service_corrupt,
"teacher login failed after registration",
)
return make_auth_header(teacher)
def fetch_classes(client, auth):
status, body, _ = client.request("GET", "/api/classes", headers=auth)
if status != 200:
raise CheckerError(
service_corrupt,
f"unexpected classes status {status}",
)
data = parse_json_payload(body, "classes list")
classes = data.get(K_CLASSES, [])
if not isinstance(classes, list):
raise CheckerError(service_corrupt, "classes payload malformed")
return classes
def find_class(classes, number, letter):
for entry in classes:
if (
isinstance(entry, dict)
and entry.get(K_CLASS_NUMBER) == number
and entry.get(K_CLASS_LETTER) == letter
):
return entry
return None
def ensure_classes(client, plan, auth):
classes_payload = fetch_classes(client, auth)
for class_spec in plan["classes"]:
existing = find_class(
classes_payload,
class_spec["number"],
class_spec["letter"],
)
if existing is None:
payload = {
K_CLASS_NUMBER: class_spec["number"],
K_CLASS_LETTER: class_spec["letter"],
}
status, body, _ = client.request(
"POST",
"/api/classes",
json_body=payload,
headers=auth,
)
if status != 201:
raise CheckerError(
service_corrupt,
f"class create failed with status {status}",
)
created = parse_json_payload(body, "class create")
classes_payload.append(created)
class_spec["id"] = int(created.get(K_IDENTIFIER))
else:
class_spec["id"] = int(existing.get(K_IDENTIFIER))
def fetch_students(client, auth):
status, body, _ = client.request("GET", "/api/students", headers=auth)
if status != 200:
raise CheckerError(
service_corrupt,
f"unexpected students status {status}",
)
data = parse_json_payload(body, "students list")
students = data.get("\u0443\u0447\u0435\u043d\u0438\u043a\u0438", [])
if not isinstance(students, list):
raise CheckerError(service_corrupt, "students payload malformed")
return students
def ensure_students(client, plan, auth):
student_specs = list(all_students(plan))
if not student_specs:
return
students_payload = fetch_students(client, auth)
snils_map = {
entry.get(K_SNILS): entry
for entry in students_payload
if isinstance(entry, dict)
}
missing = [
spec
for spec in student_specs
if spec["snils"] not in snils_map
]
for spec in missing:
payload = {
K_FIRST_NAME: spec["first"],
K_LAST_NAME: spec["last"],
K_MIDDLE_NAME: spec["middle"],
K_PASSWORD: spec["password"],
K_CONFIRM_PASSWORD: spec["password"],
K_SNILS: spec["snils"],
K_PASSPORT: spec["passport"],
}
status, _, _ = client.request(
"POST",
"/api/students",
json_body=payload,
headers=auth,
)
if status != 201:
raise CheckerError(
service_corrupt,
f"student create failed with status {status}",
)
if missing:
students_payload = fetch_students(client, auth)
snils_map = {
entry.get(K_SNILS): entry
for entry in students_payload
if isinstance(entry, dict)
}
for spec in student_specs:
entry = snils_map.get(spec["snils"])
if entry is None:
raise CheckerError(
service_corrupt,
"student missing after creation",
)
spec["id"] = int(entry.get(K_IDENTIFIER))
def add_students_to_classes(client, plan, auth):
for class_spec in plan["classes"]:
class_id = class_spec.get("id")
if class_id is None:
raise CheckerError(
service_corrupt,
"class id missing during assignment",
)
for student in class_spec["students"]:
student_id = student.get("id")
if student_id is None:
raise CheckerError(
service_corrupt,
"student id missing during assignment",
)
path = f"/api/classes/{class_id}/students/{student_id}"
status, _, _ = client.request("POST", path, headers=auth)
if status not in (201, 422):
raise CheckerError(
service_corrupt,
f"assignment failed with status {status}",
)
def fetch_lessons(client, auth, class_id):
path = f"/api/classes/{class_id}/lessons"
status, body, _ = client.request("GET", path, headers=auth)
if status != 200:
raise CheckerError(
service_corrupt,
f"unexpected lessons status {status}",
)
data = parse_json_payload(body, "lessons list")
lessons = data.get(K_LESSONS, [])
if not isinstance(lessons, list):
raise CheckerError(service_corrupt, "lessons payload malformed")
return lessons
def match_lesson(entry, lesson_spec):
return (
isinstance(entry, dict)
and entry.get(K_LESSON_DATE) == lesson_spec["date"]
and entry.get(K_LESSON_TITLE) == lesson_spec["title"]
)
def ensure_lessons(client, plan, auth):
for class_spec in plan["classes"]:
class_id = class_spec.get("id")
if class_id is None:
raise CheckerError(service_corrupt, "class id missing for lessons")
lessons_payload = fetch_lessons(client, auth, class_id)
for lesson_spec in class_spec["lessons"]:
existing = None
for entry in lessons_payload:
if match_lesson(entry, lesson_spec):
existing = entry
break
if existing is None:
payload = {
K_LESSON_DATE: lesson_spec["date"],
K_LESSON_TITLE: lesson_spec["title"],
K_LESSON_HOMEWORK: lesson_spec["homework"],
}
status, body, _ = client.request(
"POST",
f"/api/classes/{class_id}/lessons",
json_body=payload,
headers=auth,
)
if status != 201:
raise CheckerError(
service_corrupt,
f"lesson create failed with status {status}",
)
created = parse_json_payload(body, "lesson create")
lessons_payload.append(created)
def perform_put(client, plan):
auth = ensure_teacher(client, plan)
ensure_classes(client, plan, auth)
ensure_students(client, plan, auth)
add_students_to_classes(client, plan, auth)
ensure_lessons(client, plan, auth)
def verify_classes(client, plan, auth):
classes_payload = fetch_classes(client, auth)
for class_spec in plan["classes"]:
entry = find_class(
classes_payload,
class_spec["number"],
class_spec["letter"],
)
if entry is None:
raise CheckerError(service_corrupt, "expected class missing")
class_spec["id"] = int(entry.get(K_IDENTIFIER))
lessons_payload = fetch_lessons(client, auth, class_spec["id"])
for lesson_spec in class_spec["lessons"]:
if not any(
match_lesson(item, lesson_spec)
for item in lessons_payload
):
raise CheckerError(service_corrupt, "expected lesson missing")
def verify_students(client, plan, auth):
students_payload = fetch_students(client, auth)
snils_map = {
entry.get(K_SNILS): entry
for entry in students_payload
if isinstance(entry, dict)
}
for student in all_students(plan):
entry = snils_map.get(student["snils"])
if entry is None:
raise CheckerError(service_corrupt, "expected student missing")
if entry.get(K_FIRST_NAME) != student["first"]:
raise CheckerError(service_corrupt, "student first name mismatch")
if entry.get(K_LAST_NAME) != student["last"]:
raise CheckerError(service_corrupt, "student last name mismatch")
if entry.get(K_MIDDLE_NAME) != student["middle"]:
raise CheckerError(service_corrupt, "student middle name mismatch")
if entry.get(K_PASSPORT) != student["passport"]:
raise CheckerError(service_corrupt, "student passport mismatch")
if entry.get(K_USERNAME) != student["username"]:
raise CheckerError(service_corrupt, "student username mismatch")
if student["is_flagged"] and entry.get(K_SNILS) != plan["flag"]:
raise CheckerError(service_corrupt, "flag storage mismatch")
def perform_check(client, plan):
teacher = plan["teacher"]
if not login_teacher(client, teacher):
raise CheckerError(service_corrupt, "teacher credentials rejected")
auth = make_auth_header(teacher)
verify_classes(client, plan, auth)
verify_students(client, plan, auth)
if len(sys.argv) != 5:
print(
f"\nUsage:\n\t{sys.argv[0]} <host> (put|check) <flag_id> <flag>\n",
file=sys.stderr,
)
exit(1)
def main():
host = sys.argv[1]
command = sys.argv[2]
flag_id = sys.argv[3]
flag = sys.argv[4]
plan = generate_plan(flag_id, flag)
client = ApiClient(host)
try:
if command == "put":
perform_put(client, plan)
perform_check(client, plan)
service_up()
elif command == "check":
perform_check(client, plan)
service_up()
else:
raise CheckerError(service_corrupt, f"unknown command {command!r}")
except CheckerError as err:
debug(f"failure: {err}")
debug(traceback.format_exc())
err.verdict()
except Exception as exc:
debug(f"unexpected error: {exc}")
debug(traceback.format_exc())
service_mumble()
if __name__ == "__main__":
main()