Files
srab/checker/checker.py
2025-11-26 21:32:41 +03:00

743 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/python3
import errno
import hashlib
import http.client
import json
import random
import socket
import string
import sys
import traceback
PORT = 1337
TIMEOUT = 3.0
K_FIRST_NAME = "имя"
K_LAST_NAME = "фамилия"
K_MIDDLE_NAME = "отчество"
K_EDUCATION = "образование"
K_PASSWORD = "пароль"
K_CONFIRM_PASSWORD = "повтор пароля"
K_NEW_PASSWORD = "новый пароль"
K_SNILS = "снилс"
K_PASSPORT = "паспорт"
K_CLASS_NUMBER = "номер"
K_CLASS_LETTER = "буква"
K_CLASSES = "классы"
K_IDENTIFIER = "идентификатор"
K_CREATOR = "создатель"
K_USERNAME = "имя пользователя"
K_LESSONS = "уроки"
K_LESSON_DATE = "дата"
K_LESSON_TITLE = "название"
K_LESSON_HOMEWORK = "домашнее задание"
LOGIN_SUCCESS = "Рады видеть вас снова :3"
TEACHER_FIRST_NAMES = [
"Анна",
"Мария",
"Ольга",
"Елена",
"Светлана",
"Татьяна",
"Юлия",
"Вера",
"Алёна",
"Наталья",
]
TEACHER_LAST_NAMES = [
"Иванова",
"Петрова",
"Сидорова",
"Федорова",
"Кузнецова",
"Попова",
"Соколова",
"Павлова",
"Зайцева",
"Орлова",
]
TEACHER_MIDDLE_NAMES = [
"Александровна",
"Сергеевна",
"Игоревна",
"Юрьевна",
"Владимировна",
"Николаевна",
"Павловна",
"Степановна",
"Олеговна",
"Георгиевна",
]
TEACHER_EDUCATIONS = [
"Математика",
"Физика",
"Химия",
"История",
"Биология",
"География",
"Литература",
"Информатика",
]
STUDENT_FIRST_NAMES = [
"Артём",
"Борис",
"Виктория",
"Глеб",
"Дарья",
"Егор",
"Злата",
"Илья",
"Кира",
"Лев",
"Маргарита",
"Никита",
]
STUDENT_LAST_NAMES = [
"Алексеев",
"Белов",
"Васильев",
"Громов",
"Демидов",
"Ершов",
"Журавлёв",
"Зуев",
"Исаев",
"Калинин",
"Лукин",
"Миронов",
]
STUDENT_MIDDLE_NAMES = [
"Андреевич",
"Борисович",
"Владимирович",
"Геннадьевич",
"Дмитриевич",
"Евгеньевич",
"Иванович",
"Константинович",
"Львович",
"Михайлович",
]
LESSON_TOPICS = [
"Алгебра",
"Геометрия",
"Физика",
"Химия",
"Биология",
"История",
"Информатика",
"Русский язык",
]
HOMEWORK_PHRASES = [
"Решить упражнения {token} в тетради.",
"Подготовить сообщение по теме {token}.",
"Прочитать параграф {token} и сделать конспект.",
"Собрать презентацию {token}.",
]
PASSWORD_WORDS = [
"Секрет",
"Пароль",
"Шифр",
"Тайна",
"Загадка",
"Форт",
]
CLASS_LETTERS = list("АБВГД")
_ERR_HOSTDOWN = getattr(errno, "EHOSTDOWN", None)
DOWN_ERRNOS = {
value
for value in (
errno.ECONNREFUSED,
errno.EHOSTUNREACH,
errno.ENETUNREACH,
_ERR_HOSTDOWN,
)
if value is not None
}
DEBUG = True
def service_up():
print("[service is worked] - 101")
exit(101)
def service_corrupt():
print("[service is corrupt] - 102")
exit(102)
def service_mumble():
print("[service is mumble] - 103")
exit(103)
def service_down():
print("[service is down] - 104")
exit(104)
def debug(message):
if DEBUG:
print(f"[debug] {message}", file=sys.stderr)
class CheckerError(Exception):
def __init__(self, verdict, message):
super().__init__(message)
self.verdict = verdict
class ApiClient:
def __init__(self, host, port=PORT):
self.host = host
self.port = port
def request(self, method, path, *, json_body=None, headers=None):
if not path.startswith("/"):
path = "/" + path
hdrs = {} if headers is None else dict(headers)
data = None
if json_body is not None:
data = json.dumps(json_body, ensure_ascii=False).encode("utf-8")
hdrs.setdefault(
"Content-Type",
"application/json; charset=utf-8",
)
conn = http.client.HTTPConnection(
self.host,
self.port,
timeout=TIMEOUT,
)
try:
conn.request(method, path, body=data, headers=hdrs)
response = conn.getresponse()
payload = response.read()
text = payload.decode("utf-8", "ignore")
return response.status, text, dict(response.getheaders())
except socket.timeout as exc:
raise CheckerError(service_mumble, "request timeout") from exc
except OSError as exc:
err_no = exc.errno
if err_no in DOWN_ERRNOS:
raise CheckerError(
service_down,
f"connection error: {exc}",
) from exc
if err_no == errno.ECONNRESET:
raise CheckerError(
service_mumble,
f"connection reset: {exc}",
) from exc
raise CheckerError(
service_mumble,
f"network error: {exc}",
) from exc
except http.client.HTTPException as exc:
raise CheckerError(
service_mumble,
f"http error: {exc}",
) from exc
finally:
try:
conn.close()
except Exception:
pass
def make_seed(flag_id, flag):
digest = hashlib.sha256(f"{flag_id}:{flag}".encode("utf-8")).digest()
return int.from_bytes(digest, "big")
def token(rng, length):
alphabet = string.ascii_uppercase + string.digits
return "".join(rng.choice(alphabet) for _ in range(length))
def numeric(rng, length):
return "".join(rng.choice(string.digits) for _ in range(length))
def build_teacher(rng):
suffix = token(rng, 5)
first = rng.choice(TEACHER_FIRST_NAMES) + suffix
last = rng.choice(TEACHER_LAST_NAMES) + suffix
middle = rng.choice(TEACHER_MIDDLE_NAMES) + suffix
education = rng.choice(TEACHER_EDUCATIONS)
password = rng.choice(PASSWORD_WORDS) + token(rng, 5) + numeric(rng, 2)
username = f"{first}.{last}"
return {
"first": first,
"last": last,
"middle": middle,
"education": education,
"password": password,
"username": username,
}
def build_student(rng, flagged, flag):
suffix = token(rng, 3)
first = rng.choice(STUDENT_FIRST_NAMES) + suffix
last = rng.choice(STUDENT_LAST_NAMES) + suffix
middle = rng.choice(STUDENT_MIDDLE_NAMES) + suffix
password = rng.choice(PASSWORD_WORDS) + suffix + numeric(rng, 2)
snils = flag if flagged else numeric(rng, 11)
passport = "P" + numeric(rng, 8)
username = f"{first}.{last}"
return {
"first": first,
"last": last,
"middle": middle,
"password": password,
"snils": snils,
"passport": passport,
"username": username,
"is_flagged": flagged,
}
def build_lesson(rng, class_index, lesson_index):
base_day = 1 + class_index * 4 + lesson_index
day = 1 + base_day % 27
date = f"2025-09-{day:02d}"
topic = rng.choice(LESSON_TOPICS) + " " + token(rng, 2)
homework = rng.choice(HOMEWORK_PHRASES).format(token=token(rng, 3))
return {
"date": date,
"title": topic,
"homework": homework,
}
def generate_plan(flag_id, flag):
rng = random.Random(make_seed(flag_id, flag))
teacher = build_teacher(rng)
class_count = 1 + rng.randint(0, 2)
class_specs = []
total_students = 0
for _ in range(class_count):
student_count = 1 + rng.randint(0, 2)
lesson_count = 2 + rng.randint(0, 1)
class_specs.append(
{
"number": rng.randint(1, 11),
"letter": rng.choice(CLASS_LETTERS),
"student_count": student_count,
"lesson_count": lesson_count,
}
)
total_students += student_count
flagged_index = rng.randrange(total_students)
current_index = 0
classes = []
for class_idx, spec in enumerate(class_specs):
students = []
for _ in range(spec["student_count"]):
is_flagged = current_index == flagged_index
students.append(build_student(rng, is_flagged, flag))
current_index += 1
lessons = [
build_lesson(rng, class_idx, li)
for li in range(spec["lesson_count"])
]
classes.append(
{
"number": spec["number"],
"letter": spec["letter"],
"students": students,
"lessons": lessons,
}
)
return {
"teacher": teacher,
"classes": classes,
"flag": flag,
}
def all_students(plan):
for class_spec in plan["classes"]:
for student in class_spec["students"]:
yield student
def make_auth_header(teacher):
return {
"Authorization": (
f"Basic {teacher['username']} {teacher['password']}".encode(
"utf-8"
)
),
}
def parse_json_payload(text, context):
try:
return json.loads(text) if text else {}
except json.JSONDecodeError as exc:
raise CheckerError(
service_corrupt,
f"invalid json in {context}: {exc}",
) from exc
def login_teacher(client, teacher):
payload = {
K_USERNAME: teacher["username"],
K_PASSWORD: teacher["password"],
}
status, body, _ = client.request(
"POST",
"/api/users/login",
json_body=payload,
)
body_text = body.strip()
if status == 200 and body_text == LOGIN_SUCCESS:
return True
if status == 401:
return False
raise CheckerError(
service_corrupt,
f"unexpected login status {status} body={body_text!r}",
)
def ensure_teacher(client, plan):
teacher = plan["teacher"]
if login_teacher(client, teacher):
return make_auth_header(teacher)
payload = {
K_FIRST_NAME: teacher["first"],
K_LAST_NAME: teacher["last"],
K_MIDDLE_NAME: teacher["middle"],
K_EDUCATION: teacher["education"],
K_PASSWORD: teacher["password"],
K_CONFIRM_PASSWORD: teacher["password"],
}
status, _, _ = client.request("POST", "/api/users", json_body=payload)
if status != 201:
raise CheckerError(
service_corrupt,
f"teacher create failed with status {status}",
)
if not login_teacher(client, teacher):
raise CheckerError(
service_corrupt,
"teacher login failed after registration",
)
return make_auth_header(teacher)
def fetch_classes(client, auth):
status, body, _ = client.request("GET", "/api/classes", headers=auth)
if status != 200:
raise CheckerError(
service_corrupt,
f"unexpected classes status {status}",
)
data = parse_json_payload(body, "classes list")
classes = data.get(K_CLASSES, [])
if not isinstance(classes, list):
raise CheckerError(service_corrupt, "classes payload malformed")
return classes
def find_class(classes, number, letter):
for entry in classes:
if (
isinstance(entry, dict)
and entry.get(K_CLASS_NUMBER) == number
and entry.get(K_CLASS_LETTER) == letter
):
return entry
return None
def ensure_classes(client, plan, auth):
classes_payload = fetch_classes(client, auth)
for class_spec in plan["classes"]:
existing = find_class(
classes_payload,
class_spec["number"],
class_spec["letter"],
)
if existing is None:
payload = {
K_CLASS_NUMBER: class_spec["number"],
K_CLASS_LETTER: class_spec["letter"],
}
status, body, _ = client.request(
"POST",
"/api/classes",
json_body=payload,
headers=auth,
)
if status != 201:
raise CheckerError(
service_corrupt,
f"class create failed with status {status}",
)
created = parse_json_payload(body, "class create")
classes_payload.append(created)
class_spec["id"] = int(created.get(K_IDENTIFIER))
else:
class_spec["id"] = int(existing.get(K_IDENTIFIER))
def fetch_students(client, auth):
status, body, _ = client.request("GET", "/api/students", headers=auth)
if status != 200:
raise CheckerError(
service_corrupt,
f"unexpected students status {status}",
)
data = parse_json_payload(body, "students list")
students = data.get("\u0443\u0447\u0435\u043d\u0438\u043a\u0438", [])
if not isinstance(students, list):
raise CheckerError(service_corrupt, "students payload malformed")
return students
def ensure_students(client, plan, auth):
student_specs = list(all_students(plan))
if not student_specs:
return
students_payload = fetch_students(client, auth)
snils_map = {
entry.get(K_SNILS): entry
for entry in students_payload
if isinstance(entry, dict)
}
missing = [
spec
for spec in student_specs
if spec["snils"] not in snils_map
]
for spec in missing:
payload = {
K_FIRST_NAME: spec["first"],
K_LAST_NAME: spec["last"],
K_MIDDLE_NAME: spec["middle"],
K_PASSWORD: spec["password"],
K_CONFIRM_PASSWORD: spec["password"],
K_SNILS: spec["snils"],
K_PASSPORT: spec["passport"],
}
status, _, _ = client.request(
"POST",
"/api/students",
json_body=payload,
headers=auth,
)
if status != 201:
raise CheckerError(
service_corrupt,
f"student create failed with status {status}",
)
if missing:
students_payload = fetch_students(client, auth)
snils_map = {
entry.get(K_SNILS): entry
for entry in students_payload
if isinstance(entry, dict)
}
for spec in student_specs:
entry = snils_map.get(spec["snils"])
if entry is None:
raise CheckerError(
service_corrupt,
"student missing after creation",
)
spec["id"] = int(entry.get(K_IDENTIFIER))
def add_students_to_classes(client, plan, auth):
for class_spec in plan["classes"]:
class_id = class_spec.get("id")
if class_id is None:
raise CheckerError(
service_corrupt,
"class id missing during assignment",
)
for student in class_spec["students"]:
student_id = student.get("id")
if student_id is None:
raise CheckerError(
service_corrupt,
"student id missing during assignment",
)
path = f"/api/classes/{class_id}/students/{student_id}"
status, _, _ = client.request("POST", path, headers=auth)
if status not in (201, 422):
raise CheckerError(
service_corrupt,
f"assignment failed with status {status}",
)
def fetch_lessons(client, auth, class_id):
path = f"/api/classes/{class_id}/lessons"
status, body, _ = client.request("GET", path, headers=auth)
if status != 200:
raise CheckerError(
service_corrupt,
f"unexpected lessons status {status}",
)
data = parse_json_payload(body, "lessons list")
lessons = data.get(K_LESSONS, [])
if not isinstance(lessons, list):
raise CheckerError(service_corrupt, "lessons payload malformed")
return lessons
def match_lesson(entry, lesson_spec):
return (
isinstance(entry, dict)
and entry.get(K_LESSON_DATE) == lesson_spec["date"]
and entry.get(K_LESSON_TITLE) == lesson_spec["title"]
)
def ensure_lessons(client, plan, auth):
for class_spec in plan["classes"]:
class_id = class_spec.get("id")
if class_id is None:
raise CheckerError(service_corrupt, "class id missing for lessons")
lessons_payload = fetch_lessons(client, auth, class_id)
for lesson_spec in class_spec["lessons"]:
existing = None
for entry in lessons_payload:
if match_lesson(entry, lesson_spec):
existing = entry
break
if existing is None:
payload = {
K_LESSON_DATE: lesson_spec["date"],
K_LESSON_TITLE: lesson_spec["title"],
K_LESSON_HOMEWORK: lesson_spec["homework"],
}
status, body, _ = client.request(
"POST",
f"/api/classes/{class_id}/lessons",
json_body=payload,
headers=auth,
)
if status != 201:
raise CheckerError(
service_corrupt,
f"lesson create failed with status {status}",
)
created = parse_json_payload(body, "lesson create")
lessons_payload.append(created)
def perform_put(client, plan):
auth = ensure_teacher(client, plan)
ensure_classes(client, plan, auth)
ensure_students(client, plan, auth)
add_students_to_classes(client, plan, auth)
ensure_lessons(client, plan, auth)
def verify_classes(client, plan, auth):
classes_payload = fetch_classes(client, auth)
for class_spec in plan["classes"]:
entry = find_class(
classes_payload,
class_spec["number"],
class_spec["letter"],
)
if entry is None:
raise CheckerError(service_corrupt, "expected class missing")
class_spec["id"] = int(entry.get(K_IDENTIFIER))
lessons_payload = fetch_lessons(client, auth, class_spec["id"])
for lesson_spec in class_spec["lessons"]:
if not any(
match_lesson(item, lesson_spec)
for item in lessons_payload
):
raise CheckerError(service_corrupt, "expected lesson missing")
def verify_students(client, plan, auth):
students_payload = fetch_students(client, auth)
snils_map = {
entry.get(K_SNILS): entry
for entry in students_payload
if isinstance(entry, dict)
}
for student in all_students(plan):
entry = snils_map.get(student["snils"])
if entry is None:
raise CheckerError(service_corrupt, "expected student missing")
if entry.get(K_FIRST_NAME) != student["first"]:
raise CheckerError(service_corrupt, "student first name mismatch")
if entry.get(K_LAST_NAME) != student["last"]:
raise CheckerError(service_corrupt, "student last name mismatch")
if entry.get(K_MIDDLE_NAME) != student["middle"]:
raise CheckerError(service_corrupt, "student middle name mismatch")
if entry.get(K_PASSPORT) != student["passport"]:
raise CheckerError(service_corrupt, "student passport mismatch")
if entry.get(K_USERNAME) != student["username"]:
raise CheckerError(service_corrupt, "student username mismatch")
if student["is_flagged"] and entry.get(K_SNILS) != plan["flag"]:
raise CheckerError(service_corrupt, "flag storage mismatch")
def perform_check(client, plan):
teacher = plan["teacher"]
if not login_teacher(client, teacher):
raise CheckerError(service_corrupt, "teacher credentials rejected")
auth = make_auth_header(teacher)
verify_classes(client, plan, auth)
verify_students(client, plan, auth)
if len(sys.argv) != 5:
print(
f"\nUsage:\n\t{sys.argv[0]} <host> (put|check) <flag_id> <flag>\n",
file=sys.stderr,
)
exit(1)
def main():
host = sys.argv[1]
command = sys.argv[2]
flag_id = sys.argv[3]
flag = sys.argv[4]
plan = generate_plan(flag_id, flag)
client = ApiClient(host)
try:
if command == "put":
perform_put(client, plan)
perform_check(client, plan)
service_up()
elif command == "check":
perform_check(client, plan)
service_up()
else:
raise CheckerError(service_corrupt, f"unknown command {command!r}")
except CheckerError as err:
debug(f"failure: {err}")
debug(traceback.format_exc())
err.verdict()
except Exception as exc:
debug(f"unexpected error: {exc}")
debug(traceback.format_exc())
service_mumble()
if __name__ == "__main__":
main()