//! Ws Sansad manage websocket of each client use actix::prelude::*; use actix_broker::{Broker, SystemBroker}; use actix_web_actors::ws; use ms::Resp; use serde_json::{json, Value}; use std::time::{Duration, Instant}; use crate::{chat_pinnd::ChatPinnd, messages as ms, validator::{Validation as vl, validate}}; /// How often heartbeat pings are sent const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); pub struct WsSansad { kunjika: String, isthiti: Isthiti, addr: Option>, hb: Instant } #[derive(Debug)] enum Isthiti { None, Grih(String), VraktigatWaitlist } impl Actor for WsSansad { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { self.addr = Some(ctx.address().clone()); // own addr self.hb(ctx); } fn stopping(&mut self, _: &mut Self::Context) -> Running { futures::executor::block_on(self.leave_grih()); // notify leaving Running::Stop } } /// manage stream impl StreamHandler> for WsSansad { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(ws::Message::Ping(msg)) => { ctx.ping(&msg); self.hb = Instant::now(); }, Ok(ws::Message::Pong(_)) => { self.hb = Instant::now(); }, Ok(ws::Message::Text(msg)) => { futures::executor::block_on(self.parse_text_handle(msg)); }, Ok(ws::Message::Close(msg)) => { ctx.close(msg); ctx.stop(); } _ => ctx.stop() } } } /// send text message impl Handler for WsSansad { type Result = (); fn handle(&mut self, msg: ms::WsText, ctx: &mut Self::Context) -> Self::Result { let json = json!({ "cmd": "text", "text": msg.text, "reply": msg.reply, "kunjika": msg.sender_kunjika // Sender's kunjuka }); ctx.text(json.to_string()); } } /// send text status impl Handler for WsSansad { type Result = (); fn handle(&mut self, msg: ms::WsStatus, ctx: &mut Self::Context) -> Self::Result { let json = json!({ "cmd": "status", "status": msg.status, "kunjika": msg.sender_kunjika // Sender's kunjuka }); ctx.text(json.to_string()); } } /// List Vayakti impl Handler for WsSansad { type Result = (); fn handle(&mut self, msg: ms::WsList, ctx: &mut Self::Context) -> Self::Result { let json = json!({ "cmd": "list", "vayakti": msg.json }); ctx.text(json.to_string()); } } /// send response ok, error impl Handler for WsSansad { type Result = (); fn handle(&mut self, msg: ms::WsResponse, ctx: &mut Self::Context) -> Self::Result { let json = json!({ "cmd": "resp", "result": msg.result, "message": msg.message }); ctx.text(json.to_string()); } } /// notify someone got connected impl Handler for WsSansad { type Result = (); fn handle(&mut self, msg: ms::WsConnected, ctx: &mut Self::Context) -> Self::Result { let json = json!({ "cmd": "connected", "name": msg.name, "kunjika": msg.kunjika }); ctx.text(json.to_string()); } } /// notify someone got disconnected impl Handler for WsSansad { type Result = (); fn handle(&mut self, msg: ms::WsDisconnected, ctx: &mut Self::Context) -> Self::Result { let json = json!({ "cmd": "disconnected", "name": msg.name, "kunjika": msg.kunjika }); ctx.text(json.to_string()); } } /// notify got connected to random person impl Handler for WsSansad { type Result = (); fn handle(&mut self, msg: ms::WsConnectedRandom, ctx: &mut Self::Context) -> Self::Result { self.isthiti = Isthiti::Grih(msg.grih_kunjika); let json = json!({ "cmd": "random", "name": msg.name, "kunjika": msg.kunjika }); ctx.text(json.to_string()); } } impl WsSansad { pub fn new() -> Self { WsSansad { kunjika: String::new(), isthiti: Isthiti::None, addr: None, hb: Instant::now() } } /// helper method that sends ping to client every second. /// /// also this method checks heartbeats from client fn hb(&self, ctx: &mut ::Context) { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { // check client heartbeats if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { // heartbeat timed out // stop actor ctx.stop(); // don't try to send a ping return; } ctx.ping(b""); }); } /// parse the request text from client async fn parse_text_handle(&mut self, msg: String) { if let Ok(val) = serde_json::from_str::(&msg) { match val.get("cmd").unwrap().as_str().unwrap() { "join" => { self.join_grih(val).await }, "rand" => { self.join_random(val).await }, "randnext" => { self.join_random_next().await }, "text" => { self.send_text(val).await }, "status" => { self.send_status(val).await }, "list" => { self.list().await }, "leave" => { self.leave_grih().await }, _ => () } } } /// send ok response fn send_ok_response(&self, text: &str) { self.addr.clone().unwrap().do_send(ms::WsResponse { result: "Ok".to_owned(), message: text.to_owned() }); } /// send error response fn send_err_response(&self, text: &str) { self.addr.clone().unwrap().do_send(ms::WsResponse { result: "Err".to_owned(), message: text.to_owned() }); } /// Request for joining to random person async fn join_random(&mut self, val: Value) { // Check is already joined match self.isthiti { Isthiti::None => (), Isthiti::VraktigatWaitlist => { self.send_ok_response("watchlist"); return; }, Isthiti::Grih(_) => return } let kunjika = match val.get("kunjika") { Some(val ) => val.as_str().unwrap().to_owned(), None => { self.send_err_response("Invalid request"); return; } }; let name = match val.get("name") { Some(val ) => val.as_str().unwrap().to_owned(), None => { self.send_err_response("Invalid request"); return; } }; let tags = match val.get("tags") { Some(val ) => { let mut v = Vec::new(); for x in val.as_str().unwrap().split_ascii_whitespace() { v.push(x.to_owned()); } v }, None => { Vec::new() } }; // Validate if let Some(val ) = validate(vec![vl::NonEmpty, vl::NoSpace, vl::NoHashtag], &kunjika, "Kunjika") { self.send_err_response(&val); return; } else if let Some(val ) = validate(vec![vl::NonEmpty], &name, "Name") { self.send_err_response(&val); return; } // request let result: Resp = ChatPinnd::from_registry().send(ms::JoinRandom{ addr: self.addr.clone().unwrap(), kunjika: kunjika.to_owned(), name, tags }).await.unwrap(); match result { Resp::Err(err) => self.send_err_response(&err), Resp::Ok => self.kunjika = kunjika, Resp::None => { self.addr.clone().unwrap().do_send(ms::WsResponse{ result: "watch".to_owned() , message: "Watchlist".to_owned() }); self.isthiti = Isthiti::VraktigatWaitlist; self.kunjika = kunjika } } } /// Request for joining to random person async fn join_random_next(&mut self) { // Check is already joined let grih_kunjika = match &self.isthiti { Isthiti::VraktigatWaitlist => { self.send_ok_response("watchlist"); return; }, Isthiti::Grih(grih_kunjika) => grih_kunjika, Isthiti::None => { self.send_ok_response("Not allowed"); return; } }; // request let result: Resp = ChatPinnd::from_registry().send(ms::JoinRandomNext{ kunjika: self.kunjika.to_owned(), grih_kunjika: grih_kunjika.to_owned(), }).await.unwrap(); match result { Resp::Err(err) => self.send_err_response(&err), Resp::None => { self.addr.clone().unwrap().do_send(ms::WsResponse{ result: "watch".to_owned() , message: "Watchlist".to_owned() }); self.isthiti = Isthiti::VraktigatWaitlist; self.kunjika = self.kunjika.to_owned() } _ => () } } /// Request to join to grih async fn join_grih(&mut self, val: Value) { // Check is already joined match self.isthiti { Isthiti::None => (), _ => return } // is vayakti in watch list if let Isthiti::VraktigatWaitlist = self.isthiti { self.send_ok_response("watchlist"); return; } let kunjika = match val.get("kunjika") { Some(val ) => val.as_str().unwrap().to_owned(), None => { self.send_err_response("Invalid request"); return; } }; let name = match val.get("name") { Some(val ) => val.as_str().unwrap().to_owned(), None => { self.send_err_response("Invalid request"); return; } }; let grih_kunjika = match val.get("grih_kunjika") { Some(val ) => val.as_str().unwrap().to_owned(), None => { self.send_err_response("Invalid request"); return; } }; let length: Option = match val.get("length") { Some(val) => match val.as_i64(){ Some(val) => Some(val as usize), None => None }, None => None }; // Validate if let Some(val ) = validate(vec![vl::NonEmpty, vl::NoGupt, vl::NoSpace], &grih_kunjika, "Grih Kunjika") { self.send_err_response(&val); return; } else if let Some(val ) = validate(vec![vl::NonEmpty, vl::NoSpace, vl::NoHashtag], &kunjika, "Kunjika") { self.send_err_response(&val); return; } else if let Some(val ) = validate(vec![vl::NonEmpty], &name, "Name") { self.send_err_response(&val); return; } // request let result: Resp = ChatPinnd::from_registry().send(ms::JoinGrih { grih_kunjika: grih_kunjika.to_owned(), length, addr: self.addr.clone().unwrap(), kunjika: kunjika.to_owned(), name }).await.unwrap(); match result { Resp::Err(err) => self.send_err_response(&err), Resp::Ok => { self.isthiti = Isthiti::Grih(grih_kunjika); self.kunjika = kunjika; self.send_ok_response("joined") } _ => () } } /// Request to join to grih async fn list(&mut self) { // check if vayakti exist if let Isthiti::None = self.isthiti { self.send_err_response("Not in any Grih"); return; } // check if connected to any grih match &self.isthiti { Isthiti::Grih(kunjika) => { let json: String = ChatPinnd::from_registry().send(ms::List { grih_kunjika: kunjika.to_owned() }).await.unwrap(); self.addr.clone().unwrap().do_send(ms::WsList { json }) }, _ => { self.send_err_response("Grih not connected"); return; } } } /// send text to vayakti in grih async fn send_text(&mut self, val: Value) { // check if vayakti exist if let Isthiti::None = self.isthiti { self.send_err_response("Not in any Grih"); return; } // check if connected to any grih match self.isthiti { Isthiti::Grih(_) => (), _ => { self.send_err_response("Grih not connected"); return; } } // sent text let text = match val.get("text") { Some(val) => val, None => { self.send_err_response("Invalid request"); return; } }.as_str().unwrap().to_owned(); let reply: Option = match val.get("reply") { Some(val) => Some(val.as_str().unwrap().to_owned()), None => None }; let grih_kunjika = match &self.isthiti { Isthiti::Grih(grih_kunjika) => { grih_kunjika.to_owned() }, _ => { return; } }; Broker::::issue_async(ms::SendText { grih_kunjika, kunjika: self.kunjika.to_owned(), text, reply }); } /// send status to vayakti in grih async fn send_status(&mut self, val: Value) { // check if vayakti exist if let Isthiti::None = self.isthiti { self.send_err_response("Not in any Grih"); return; } // check if connected to any grih match self.isthiti { Isthiti::Grih(_) => (), _ => { self.send_err_response("Grih not connected"); return; } } // sent status let status = match val.get("status") { Some(val) => val, None => { self.send_err_response("Invalid request"); return; } }.as_str().unwrap().to_owned(); let grih_kunjika = match &self.isthiti { Isthiti::Grih(grih_kunjika) => { grih_kunjika.to_owned() }, _ => { return; } }; Broker::::issue_async(ms::SendStatus { grih_kunjika, kunjika: self.kunjika.to_owned(), status }); } /// notify leaving async fn leave_grih(&mut self) { let grih_kunjika = match &self.isthiti { Isthiti::Grih(val) => Some(val.to_owned()), _ => None }; Broker::::issue_async(ms::LeaveUser { grih_kunjika, kunjika: self.kunjika.to_owned(), addr: self.addr.clone().unwrap() }); self.isthiti = Isthiti::None; self.send_ok_response("left"); } }