#!/usr/bin/python3 import errno import hashlib import http.client import json import random import socket import string import sys import traceback PORT = 1337 TIMEOUT = 3.0 K_FIRST_NAME = "имя" K_LAST_NAME = "фамилия" K_MIDDLE_NAME = "отчество" K_EDUCATION = "образование" K_PASSWORD = "пароль" K_CONFIRM_PASSWORD = "повтор пароля" K_NEW_PASSWORD = "новый пароль" K_SNILS = "снилс" K_PASSPORT = "паспорт" K_CLASS_NUMBER = "номер" K_CLASS_LETTER = "буква" K_CLASSES = "классы" K_IDENTIFIER = "идентификатор" K_CREATOR = "создатель" K_USERNAME = "имя пользователя" K_LESSONS = "уроки" K_LESSON_DATE = "дата" K_LESSON_TITLE = "название" K_LESSON_HOMEWORK = "домашнее задание" LOGIN_SUCCESS = "Рады видеть вас снова :3" TEACHER_FIRST_NAMES = [ "Анна", "Мария", "Ольга", "Елена", "Светлана", "Татьяна", "Юлия", "Вера", "Алёна", "Наталья", ] TEACHER_LAST_NAMES = [ "Иванова", "Петрова", "Сидорова", "Федорова", "Кузнецова", "Попова", "Соколова", "Павлова", "Зайцева", "Орлова", ] TEACHER_MIDDLE_NAMES = [ "Александровна", "Сергеевна", "Игоревна", "Юрьевна", "Владимировна", "Николаевна", "Павловна", "Степановна", "Олеговна", "Георгиевна", ] TEACHER_EDUCATIONS = [ "Математика", "Физика", "Химия", "История", "Биология", "География", "Литература", "Информатика", ] STUDENT_FIRST_NAMES = [ "Артём", "Борис", "Виктория", "Глеб", "Дарья", "Егор", "Злата", "Илья", "Кира", "Лев", "Маргарита", "Никита", ] STUDENT_LAST_NAMES = [ "Алексеев", "Белов", "Васильев", "Громов", "Демидов", "Ершов", "Журавлёв", "Зуев", "Исаев", "Калинин", "Лукин", "Миронов", ] STUDENT_MIDDLE_NAMES = [ "Андреевич", "Борисович", "Владимирович", "Геннадьевич", "Дмитриевич", "Евгеньевич", "Иванович", "Константинович", "Львович", "Михайлович", ] LESSON_TOPICS = [ "Алгебра", "Геометрия", "Физика", "Химия", "Биология", "История", "Информатика", "Русский язык", ] HOMEWORK_PHRASES = [ "Решить упражнения {token} в тетради.", "Подготовить сообщение по теме {token}.", "Прочитать параграф {token} и сделать конспект.", "Собрать презентацию {token}.", ] PASSWORD_WORDS = [ "Секрет", "Пароль", "Шифр", "Тайна", "Загадка", "Форт", ] CLASS_LETTERS = list("АБВГД") _ERR_HOSTDOWN = getattr(errno, "EHOSTDOWN", None) DOWN_ERRNOS = { value for value in ( errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ENETUNREACH, _ERR_HOSTDOWN, ) if value is not None } DEBUG = True def service_up(): print("[service is worked] - 101") exit(101) def service_corrupt(): print("[service is corrupt] - 102") exit(102) def service_mumble(): print("[service is mumble] - 103") exit(103) def service_down(): print("[service is down] - 104") exit(104) def debug(message): if DEBUG: print(f"[debug] {message}", file=sys.stderr) class CheckerError(Exception): def __init__(self, verdict, message): super().__init__(message) self.verdict = verdict class ApiClient: def __init__(self, host, port=PORT): self.host = host self.port = port def request(self, method, path, *, json_body=None, headers=None): if not path.startswith("/"): path = "/" + path hdrs = {} if headers is None else dict(headers) data = None if json_body is not None: data = json.dumps(json_body, ensure_ascii=False).encode("utf-8") hdrs.setdefault( "Content-Type", "application/json; charset=utf-8", ) conn = http.client.HTTPConnection( self.host, self.port, timeout=TIMEOUT, ) try: conn.request(method, path, body=data, headers=hdrs) response = conn.getresponse() payload = response.read() text = payload.decode("utf-8", "ignore") return response.status, text, dict(response.getheaders()) except socket.timeout as exc: raise CheckerError(service_mumble, "request timeout") from exc except OSError as exc: err_no = exc.errno if err_no in DOWN_ERRNOS: raise CheckerError( service_down, f"connection error: {exc}", ) from exc if err_no == errno.ECONNRESET: raise CheckerError( service_mumble, f"connection reset: {exc}", ) from exc raise CheckerError( service_mumble, f"network error: {exc}", ) from exc except http.client.HTTPException as exc: raise CheckerError( service_mumble, f"http error: {exc}", ) from exc finally: try: conn.close() except Exception: pass def make_seed(flag_id, flag): digest = hashlib.sha256(f"{flag_id}:{flag}".encode("utf-8")).digest() return int.from_bytes(digest, "big") def token(rng, length): alphabet = string.ascii_uppercase + string.digits return "".join(rng.choice(alphabet) for _ in range(length)) def numeric(rng, length): return "".join(rng.choice(string.digits) for _ in range(length)) def build_teacher(rng): suffix = token(rng, 5) first = rng.choice(TEACHER_FIRST_NAMES) + suffix last = rng.choice(TEACHER_LAST_NAMES) + suffix middle = rng.choice(TEACHER_MIDDLE_NAMES) + suffix education = rng.choice(TEACHER_EDUCATIONS) password = rng.choice(PASSWORD_WORDS) + token(rng, 5) + numeric(rng, 2) username = f"{first}.{last}" return { "first": first, "last": last, "middle": middle, "education": education, "password": password, "username": username, } def build_student(rng, flagged, flag): suffix = token(rng, 3) first = rng.choice(STUDENT_FIRST_NAMES) + suffix last = rng.choice(STUDENT_LAST_NAMES) + suffix middle = rng.choice(STUDENT_MIDDLE_NAMES) + suffix password = rng.choice(PASSWORD_WORDS) + suffix + numeric(rng, 2) snils = flag if flagged else numeric(rng, 11) passport = "P" + numeric(rng, 8) username = f"{first}.{last}" return { "first": first, "last": last, "middle": middle, "password": password, "snils": snils, "passport": passport, "username": username, "is_flagged": flagged, } def build_lesson(rng, class_index, lesson_index): base_day = 1 + class_index * 4 + lesson_index day = 1 + base_day % 27 date = f"2025-09-{day:02d}" topic = rng.choice(LESSON_TOPICS) + " " + token(rng, 2) homework = rng.choice(HOMEWORK_PHRASES).format(token=token(rng, 3)) return { "date": date, "title": topic, "homework": homework, } def generate_plan(flag_id, flag): rng = random.Random(make_seed(flag_id, flag)) teacher = build_teacher(rng) class_count = 1 + rng.randint(0, 2) class_specs = [] total_students = 0 for _ in range(class_count): student_count = 1 + rng.randint(0, 2) lesson_count = 2 + rng.randint(0, 1) class_specs.append( { "number": rng.randint(1, 11), "letter": rng.choice(CLASS_LETTERS), "student_count": student_count, "lesson_count": lesson_count, } ) total_students += student_count flagged_index = rng.randrange(total_students) current_index = 0 classes = [] for class_idx, spec in enumerate(class_specs): students = [] for _ in range(spec["student_count"]): is_flagged = current_index == flagged_index students.append(build_student(rng, is_flagged, flag)) current_index += 1 lessons = [ build_lesson(rng, class_idx, li) for li in range(spec["lesson_count"]) ] classes.append( { "number": spec["number"], "letter": spec["letter"], "students": students, "lessons": lessons, } ) return { "teacher": teacher, "classes": classes, "flag": flag, } def all_students(plan): for class_spec in plan["classes"]: for student in class_spec["students"]: yield student def make_auth_header(teacher): return { "Authorization": ( f"Basic {teacher['username']} {teacher['password']}".encode( "utf-8" ) ), } def parse_json_payload(text, context): try: return json.loads(text) if text else {} except json.JSONDecodeError as exc: raise CheckerError( service_corrupt, f"invalid json in {context}: {exc}", ) from exc def login_teacher(client, teacher): payload = { K_USERNAME: teacher["username"], K_PASSWORD: teacher["password"], } status, body, _ = client.request( "POST", "/api/users/login", json_body=payload, ) body_text = body.strip() if status == 200 and body_text == LOGIN_SUCCESS: return True if status == 401: return False raise CheckerError( service_corrupt, f"unexpected login status {status} body={body_text!r}", ) def ensure_teacher(client, plan): teacher = plan["teacher"] if login_teacher(client, teacher): return make_auth_header(teacher) payload = { K_FIRST_NAME: teacher["first"], K_LAST_NAME: teacher["last"], K_MIDDLE_NAME: teacher["middle"], K_EDUCATION: teacher["education"], K_PASSWORD: teacher["password"], K_CONFIRM_PASSWORD: teacher["password"], } status, _, _ = client.request("POST", "/api/users", json_body=payload) if status != 201: raise CheckerError( service_corrupt, f"teacher create failed with status {status}", ) if not login_teacher(client, teacher): raise CheckerError( service_corrupt, "teacher login failed after registration", ) return make_auth_header(teacher) def fetch_classes(client, auth): status, body, _ = client.request("GET", "/api/classes", headers=auth) if status != 200: raise CheckerError( service_corrupt, f"unexpected classes status {status}", ) data = parse_json_payload(body, "classes list") classes = data.get(K_CLASSES, []) if not isinstance(classes, list): raise CheckerError(service_corrupt, "classes payload malformed") return classes def find_class(classes, number, letter): for entry in classes: if ( isinstance(entry, dict) and entry.get(K_CLASS_NUMBER) == number and entry.get(K_CLASS_LETTER) == letter ): return entry return None def ensure_classes(client, plan, auth): classes_payload = fetch_classes(client, auth) for class_spec in plan["classes"]: existing = find_class( classes_payload, class_spec["number"], class_spec["letter"], ) if existing is None: payload = { K_CLASS_NUMBER: class_spec["number"], K_CLASS_LETTER: class_spec["letter"], } status, body, _ = client.request( "POST", "/api/classes", json_body=payload, headers=auth, ) if status != 201: raise CheckerError( service_corrupt, f"class create failed with status {status}", ) created = parse_json_payload(body, "class create") classes_payload.append(created) class_spec["id"] = int(created.get(K_IDENTIFIER)) else: class_spec["id"] = int(existing.get(K_IDENTIFIER)) def fetch_students(client, auth): status, body, _ = client.request("GET", "/api/students", headers=auth) if status != 200: raise CheckerError( service_corrupt, f"unexpected students status {status}", ) data = parse_json_payload(body, "students list") students = data.get("\u0443\u0447\u0435\u043d\u0438\u043a\u0438", []) if not isinstance(students, list): raise CheckerError(service_corrupt, "students payload malformed") return students def ensure_students(client, plan, auth): student_specs = list(all_students(plan)) if not student_specs: return students_payload = fetch_students(client, auth) snils_map = { entry.get(K_SNILS): entry for entry in students_payload if isinstance(entry, dict) } missing = [ spec for spec in student_specs if spec["snils"] not in snils_map ] for spec in missing: payload = { K_FIRST_NAME: spec["first"], K_LAST_NAME: spec["last"], K_MIDDLE_NAME: spec["middle"], K_PASSWORD: spec["password"], K_CONFIRM_PASSWORD: spec["password"], K_SNILS: spec["snils"], K_PASSPORT: spec["passport"], } status, _, _ = client.request( "POST", "/api/students", json_body=payload, headers=auth, ) if status != 201: raise CheckerError( service_corrupt, f"student create failed with status {status}", ) if missing: students_payload = fetch_students(client, auth) snils_map = { entry.get(K_SNILS): entry for entry in students_payload if isinstance(entry, dict) } for spec in student_specs: entry = snils_map.get(spec["snils"]) if entry is None: raise CheckerError( service_corrupt, "student missing after creation", ) spec["id"] = int(entry.get(K_IDENTIFIER)) def add_students_to_classes(client, plan, auth): for class_spec in plan["classes"]: class_id = class_spec.get("id") if class_id is None: raise CheckerError( service_corrupt, "class id missing during assignment", ) for student in class_spec["students"]: student_id = student.get("id") if student_id is None: raise CheckerError( service_corrupt, "student id missing during assignment", ) path = f"/api/classes/{class_id}/students/{student_id}" status, _, _ = client.request("POST", path, headers=auth) if status not in (201, 422): raise CheckerError( service_corrupt, f"assignment failed with status {status}", ) def fetch_lessons(client, auth, class_id): path = f"/api/classes/{class_id}/lessons" status, body, _ = client.request("GET", path, headers=auth) if status != 200: raise CheckerError( service_corrupt, f"unexpected lessons status {status}", ) data = parse_json_payload(body, "lessons list") lessons = data.get(K_LESSONS, []) if not isinstance(lessons, list): raise CheckerError(service_corrupt, "lessons payload malformed") return lessons def match_lesson(entry, lesson_spec): return ( isinstance(entry, dict) and entry.get(K_LESSON_DATE) == lesson_spec["date"] and entry.get(K_LESSON_TITLE) == lesson_spec["title"] ) def ensure_lessons(client, plan, auth): for class_spec in plan["classes"]: class_id = class_spec.get("id") if class_id is None: raise CheckerError(service_corrupt, "class id missing for lessons") lessons_payload = fetch_lessons(client, auth, class_id) for lesson_spec in class_spec["lessons"]: existing = None for entry in lessons_payload: if match_lesson(entry, lesson_spec): existing = entry break if existing is None: payload = { K_LESSON_DATE: lesson_spec["date"], K_LESSON_TITLE: lesson_spec["title"], K_LESSON_HOMEWORK: lesson_spec["homework"], } status, body, _ = client.request( "POST", f"/api/classes/{class_id}/lessons", json_body=payload, headers=auth, ) if status != 201: raise CheckerError( service_corrupt, f"lesson create failed with status {status}", ) created = parse_json_payload(body, "lesson create") lessons_payload.append(created) def perform_put(client, plan): auth = ensure_teacher(client, plan) ensure_classes(client, plan, auth) ensure_students(client, plan, auth) add_students_to_classes(client, plan, auth) ensure_lessons(client, plan, auth) def verify_classes(client, plan, auth): classes_payload = fetch_classes(client, auth) for class_spec in plan["classes"]: entry = find_class( classes_payload, class_spec["number"], class_spec["letter"], ) if entry is None: raise CheckerError(service_corrupt, "expected class missing") class_spec["id"] = int(entry.get(K_IDENTIFIER)) lessons_payload = fetch_lessons(client, auth, class_spec["id"]) for lesson_spec in class_spec["lessons"]: if not any( match_lesson(item, lesson_spec) for item in lessons_payload ): raise CheckerError(service_corrupt, "expected lesson missing") def verify_students(client, plan, auth): students_payload = fetch_students(client, auth) snils_map = { entry.get(K_SNILS): entry for entry in students_payload if isinstance(entry, dict) } for student in all_students(plan): entry = snils_map.get(student["snils"]) if entry is None: raise CheckerError(service_corrupt, "expected student missing") if entry.get(K_FIRST_NAME) != student["first"]: raise CheckerError(service_corrupt, "student first name mismatch") if entry.get(K_LAST_NAME) != student["last"]: raise CheckerError(service_corrupt, "student last name mismatch") if entry.get(K_MIDDLE_NAME) != student["middle"]: raise CheckerError(service_corrupt, "student middle name mismatch") if entry.get(K_PASSPORT) != student["passport"]: raise CheckerError(service_corrupt, "student passport mismatch") if entry.get(K_USERNAME) != student["username"]: raise CheckerError(service_corrupt, "student username mismatch") if student["is_flagged"] and entry.get(K_SNILS) != plan["flag"]: raise CheckerError(service_corrupt, "flag storage mismatch") def perform_check(client, plan): teacher = plan["teacher"] if not login_teacher(client, teacher): raise CheckerError(service_corrupt, "teacher credentials rejected") auth = make_auth_header(teacher) verify_classes(client, plan, auth) verify_students(client, plan, auth) if len(sys.argv) != 5: print( f"\nUsage:\n\t{sys.argv[0]} (put|check) \n", file=sys.stderr, ) exit(1) def main(): host = sys.argv[1] command = sys.argv[2] flag_id = sys.argv[3] flag = sys.argv[4] plan = generate_plan(flag_id, flag) client = ApiClient(host) try: if command == "put": perform_put(client, plan) perform_check(client, plan) service_up() elif command == "check": perform_check(client, plan) service_up() else: raise CheckerError(service_corrupt, f"unknown command {command!r}") except CheckerError as err: debug(f"failure: {err}") debug(traceback.format_exc()) err.verdict() except Exception as exc: debug(f"unexpected error: {exc}") debug(traceback.format_exc()) service_mumble() if __name__ == "__main__": main()