use warp::Filter; use warp::path; use tokio; use tokio::sync::RwLock; use std::sync::{Arc, Weak}; use tokio::sync::mpsc; use std::sync::atomic::AtomicU64; use futures::{StreamExt, SinkExt}; //use serde::{Serialize,Deserialize}; mod msg; struct User{ //id: u64, cursor: Option, tx: mpsc::UnboundedSender, } type UserMap = RwLock>; #[derive(Default)] struct Channel { state: RwLock>, users: UserMap, } impl Channel { async fn register(&self, uid:u64, u: User){ println!("registering {}", uid); { let rstate = self.state.read().await; if let Some(s) = rstate.as_ref() { let _sendres = u.tx.send(s.to_string()); } } { let mut wusers = self.users.write().await; let others = wusers.iter().map(|(&other_uid, _)| other_uid).collect(); let userinfo = msg::Header::UsersInfo{my_uid: uid, others}; let _sendres = u.tx.send(format!("{}\0{}", 0, serde_json::to_string(&userinfo).unwrap())); for (&other_uid, other_user) in wusers.iter(){ if let Some(c) = other_user.cursor.as_ref() { let _sendres = u.tx.send(format!("{}\0{}", other_uid, c)); } let _sendres = other_user.tx.send(format!("{}\0{}", uid, serde_json::to_string(&msg::Header::Join).unwrap())); } wusers.insert(uid, u); } println!("registered {}", uid); } async fn unregister(&self, uid: u64){ println!("unregistering {}", uid); let mut wusers = self.users.write().await; wusers.remove(&uid); for (_other_uid, other_user) in wusers.iter(){ let _sendres = other_user.tx.send(format!("{}\0{}", uid, serde_json::to_string(&msg::Header::Leave).unwrap())); } wusers.remove(&uid); println!("unregistered {}", uid); } } type ChannelMap = RwLock>>; static NEXT_USER_ID: AtomicU64 = AtomicU64::new(1); async fn load_chan(state: &ChannelMap, channel_name: &String) -> Arc { let statemap = state.read().await; let res = statemap.get(channel_name); if let Some(w) = res { if let Some(a) = w.upgrade() { return a; } } drop(statemap); let mut statemap = state.write().await; if let Some(w) = statemap.get(channel_name) { if let Some(a) = w.upgrade() { return a; } } println!("creating channel {}",channel_name); let chan = Arc::::default(); statemap.insert(channel_name.to_owned(), Arc::downgrade(&chan)); return chan; } async fn handler(channel_name: String, socket: warp::ws::WebSocket, state: &ChannelMap) { // user id mod 2**53 for javascript (f64) compatibility let my_uid = NEXT_USER_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % (1<<53); // register in channel let chan = load_chan(state, &channel_name).await; let (mut stx, mut srx) = socket.split(); let (ctx, mut crx) = mpsc::unbounded_channel(); let u = User{cursor: None, tx: ctx}; chan.register(my_uid, u).await; // start background task to send out queued messages from channel let sender = async move { while let Some(s) = crx.recv().await { let m = warp::filters::ws::Message::text(s); let res = stx.send(m).await; if res.is_err() { break; } } }; tokio::task::spawn(sender); // relay messages into channel while let Some(Ok(m)) = srx.next().await { println!("{:?} m: {:?}", channel_name, m); if let Ok(s) = m.to_str() { let info = if let Ok(h) = msg::parse_header(s) { h} else { continue; }; use msg::Header::*; match info { State(meta) => { let mut state = chan.state.write().await; if let Some(Ok((_olduid,State(oldmeta)))) = state.as_ref().map(|m| msg::parse_uid_header(m)) { if (oldmeta.generation, oldmeta.sender_power) >= (meta.generation, meta.sender_power) { // this state would be overruled anyways, so no need to send it continue; } } *state = Some(format!("{}\0{}",my_uid,s)); } /*msg::Header::Init => { if chan.state.read().await.is_some() { continue; } let mut state = chan.state.write().await; *state = Some(format!("{}\0{}",my_uid,s)); // don't broadcast, other users will become it as welcome message continue; }*/ Cursor => { let mut usersw = chan.users.write().await; let mut me = usersw.get_mut(&my_uid).expect("lookup current user"); me.cursor = Some(s.to_string()); } // drop other messages, they are not to be sent to us _ => { continue; } } for (&uid,user) in chan.users.read().await.iter() { if uid == my_uid { continue; } let _res = user.tx.send(format!("{}\0{}", my_uid, s)); } } } // unregister from channel chan.unregister(my_uid).await; } #[tokio::main] async fn main() { simple_logger::SimpleLogger::new().with_level(log::LevelFilter::Info).init().unwrap(); let state : &'static _= Box::leak( Box::new(ChannelMap::default())); let sockets = path!("channel"/String).and(warp::ws()).map(move |channel: String, ws: warp::ws::Ws| ws.on_upgrade(move |socket| handler(channel, socket, state))); let dir = warp::fs::dir("www"); let log = warp::filters::log::log("www"); let app = sockets.or(dir).with(log); warp::serve(app).run(([127,0,0,1], 3030)).await; }