#!/usr/bin/env python3 import asyncio import websockets import dataclasses import traceback import weakref import secrets import json import random import http import sys import logging logger = logging.getLogger('websockets') logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler()) games = weakref.WeakValueDictionary() class Handler: @classmethod async def handle(cls, websocket, path): print("yay, a connection!", websocket) c = cls(websocket, path) return await c.recv_loop() def __init__(self, websocket, path): self.websocket = websocket self.path = path self.loop = asyncio.get_event_loop() self.id = secrets.token_hex(4) async def send(self, type, data): msg = dict(data) msg["type"] = type await self.websocket.send(json.dumps(msg)) async def welcome(self): print("sending welcome, id", self.id) await self.send("welcome", {"id":self.id}) print("welcome sent") async def recv_loop(self): await self.welcome() while not self.websocket.closed: try: j = await self.websocket.recv() m = json.loads(j) if "action" in m: method = getattr(self, "do_"+m["action"]) m.pop("action") res = method(**m) if res: res["type"] = "return" except websockets.ConnectionClosed as c: traceback.print_exc() self.on_close() except Exception as e: traceback.print_exc() await self.websocket.send(json.dumps({"type":"error", "exception": str(e)})) def on_close(self): print("aand gone", self) print(vars(self)) if self.game is not None: try: self.game.unregister(self) except: traceback.print_exc() print("strange unregister bug", file=sys.stderr) self.game.post_event("left_game", id=self.id) if self.team is not None: try:self.team.unregister(self) except: print("strange unregister bug", file=sys.stderr) traceback.print_exc() self.game.post_event("left_game", id=self.id) pass class EventChannel: def __init__(self): self.connections = weakref.WeakSet() self.events = [] def post_event(self, etype, **e): e = dict(e) e["event"] = etype self.events.append(e) for c in self.connections: asyncio.create_task(c.send("event", e)) def register(self, connection): self.connections.add(connection) for e in self.events: asyncio.create_task(connection.send("event", e)) def unregister(self, connection): self.connections.remove(connection) class Team(EventChannel): def __init__(self): super().__init__() self.words = None def init(self, words): self.words = words self.white_markers = 0 self.black_markers = 0 self.newturn() def newturn(self): self.code = None self.code_owner = None self.own_guess = None self.opponents_guess = None self.hints = None self.revealed = False def gencode(self): self.code = list(random.sample([1,2,3,4], 3)) return self.code def unregister(self, connection): print("in team unregister", file=sys.stderr) print(vars(self), file=sys.stderr) # check if code owner left. # if so, reset the code that an other player take a card if hasattr(self, "hints") and hasattr(self, "code_owner"): print("checking for code owner", file=sys.stderr) print(vars(self), vars(connection), file=sys.stderr) if self.hints is None and self.code_owner == connection.id: print("is code owner", file=sys.stderr) self.code_owner = None self.code = None print(vars(self), vars(connection), file=sys.stderr) try: super().unregister(connection) except: print("strange unregister bug", file=sys.stderr) traceback.print_exc() class Game(EventChannel): with open("words.txt") as f: WORDS = [w.strip().upper() for w in f.read().splitlines()] def __init__(self): super().__init__() self.teams = (Team(), Team()) self.turn = None self.running = False while True: self.id = secrets.token_hex(4) if self.id in games: continue games[self.id] = self break self.post_event("created_game", gameid=self.id) def start(self): words = random.sample(Game.WORDS, 8) words = (words[0:4], words[4:8]) print(words) for t,ws in zip(self.teams, words): print(t, ws) t.post_event("words", words=ws) t.init(ws) self.running = True self.post_event("started_game") self.start_turn(1) def start_turn(self, turn): self.turn = turn for t in self.teams: t.newturn() self.post_event("new_turn", turn=turn) def progress(self): finished = 0 for teamid,t in enumerate(self.teams): if t.own_guess and t.opponents_guess or self.turn == 1 and t.own_guess: finished += 1 if not t.revealed: self.post_event("reveal", teamid=teamid, own_guess=t.own_guess, opponents_guess=t.opponents_guess, code=t.code, turn=self.turn) t.revealed = True # TODO: gewinnen und verlieren if finished == 2: self.start_turn(self.turn+1) class Connection(Handler): def __init__(self, websocket, path): super().__init__(websocket, path) self.name = None self.team = None self.game = None def do_set_name(self, name): self.name = name def do_create_game(self): assert self.name self.game = Game() # reference keeps game alive self._joined_game() def do_join_game(self, gameid): assert self.name self.game = games[gameid] # reference keeps game alive self._joined_game() def _joined_game(self): self.game.register(self) self.game.post_event("joined_game", playerid=self.id, name=self.name) def do_join_team(self, teamid): assert self.game assert not self.team or not self.game.running if self.team is not None: self.team.unregister(self) self.team = self.game.teams[teamid] self.team.register(self) self.game.post_event("joined_team", playerid=self.id, teamid=teamid) def do_take_code(self, turn, override = False): # override: take code if somebody else already did so # useful if the other one left the game. assert self.game.running assert self.team assert turn == self.game.turn if self.team.code is None or override: code = self.team.gencode() self.team.code_owner = self.id self.game.post_event("has_code", id=self.id, turn=turn, teamid=self.teamid()) asyncio.get_event_loop().create_task( self.send("event", {"event": "secret_code", "code":code, "turn":turn})) else: raise RuntimeError("Es hat schon jemand eine Codekarte") def do_hints(self, hints, turn): assert turn == self.game.turn assert self.id == self.team.code_owner assert self.team.hints is None assert len(hints) == 3 assert all((h and type(h) == str) for h in hints) self.team.hints = hints self.game.post_event("hints", turn= self.game.turn, hints=self.team.hints, teamid=self.teamid()) def do_guess(self, guess, turn, teamid): guessteam = self.game.teams[teamid] own = guessteam == self.team assert turn == self.game.turn assert len(guess) == 3 assert len(set(guess)) == 3 assert all(g in [1,2,3,4] for g in guess) assert own or turn != 1 if own: assert guessteam.own_guess is None guessteam.own_guess = guess else: assert guessteam.opponents_guess is None, "already guessed "+str(guessteam.opponents_guess) guessteam.opponents_guess = guess correct = guess == guessteam.code publish_guess = {"own_guess" if own else "opponents_guess" : guess} self.team.post_event("reveal", teamid=teamid, code=guessteam.code, turn=self.game.turn, **publish_guess) if own and not correct: self.team.black_markers += 1 self.game.post_event("marker", teamid=self.teamid(), turn=turn, color="black") if not own and correct: self.team.white_markers += 1 self.game.post_event("marker", teamid=self.teamid(), turn=turn, color="white") self.game.progress() def do_start_game(self): assert self.game assert not self.game.running self.game.start() def teamid(self): return self.game.teams.index(self.team) def opponent(self): return self.game.teams[1-self.game.teams.index(self.team)] def http_handler(path, request): if path == "/stats": stats = "{} game(s) ongoing\n".format(len(games)) return (http.HTTPStatus.OK, [("Content-Type","text/plain"),("Access-Control-Allow-Origin", "*")], stats.encode()) # else it's the web socket start_server = websockets.serve(Connection.handle, "", 8765, extra_headers={"Access-Control-Allow-Origin": "*"}, process_request = http_handler) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()