Broker based system

This commit is contained in:
Piyush मिश्रः 2021-02-06 23:10:47 +05:30
parent d4ef5636ef
commit 22d2592311
4 changed files with 81 additions and 83 deletions

View File

@ -2,9 +2,11 @@
use std::collections::HashMap; use std::collections::HashMap;
use actix::{Actor, Addr, Context, Handler, Message}; use actix::prelude::*;
use actix_broker::BrokerSubscribe;
use crate::ws_sansad; use crate::ws_sansad;
use crate::messages as ms;
#[allow(dead_code)] #[allow(dead_code)]
pub struct ChatPinnd { pub struct ChatPinnd {
@ -18,41 +20,21 @@ pub struct Grih {
clients: Vec<Addr<ws_sansad::WsSansad>> clients: Vec<Addr<ws_sansad::WsSansad>>
} }
// Handler Messages
pub struct Join {
pub grih: JoinType,
pub length: Option<usize>,
pub addr: Addr<ws_sansad::WsSansad>
}
#[allow(dead_code)]
pub enum JoinType {
Name(String),
Kunjika(i32)
}
pub struct Text {
pub grih_kunjika: i32,
pub sender_name: String,
pub text: String
}
pub struct Delete {
pub grih_kunjika: i32,
pub addr: Addr<ws_sansad::WsSansad>
}
impl Actor for ChatPinnd { impl Actor for ChatPinnd {
type Context = Context<Self>; type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.subscribe_system_async::<ms::ReciveText>(ctx);
self.subscribe_system_async::<ms::LeaveUser>(ctx);
}
} }
impl Handler<Join> for ChatPinnd { impl Handler<ms::JoinUser> for ChatPinnd {
type Result = Option<i32>; type Result = Option<i32>;
fn handle(&mut self, msg: Join, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ms::JoinUser, _: &mut Self::Context) -> Self::Result {
println!("Came to join");
match msg.grih { match msg.grih {
JoinType::Name(name) => { ms::JoinUserGrihType::Name(name) => {
let mat = Some(name.clone()); let mat = Some(name.clone());
if let Some((kunjika, grih)) = if let Some((kunjika, grih)) =
self.grih.iter_mut().find(|(_, g)| g.name == mat) { self.grih.iter_mut().find(|(_, g)| g.name == mat) {
@ -69,7 +51,6 @@ impl Handler<Join> for ChatPinnd {
while self.grih.contains_key(&kunjika) { while self.grih.contains_key(&kunjika) {
kunjika = rand::random::<i32>(); kunjika = rand::random::<i32>();
} }
println!("Creating {}", name);
self.grih.insert(kunjika, Grih { self.grih.insert(kunjika, Grih {
name: Some(name), name: Some(name),
length: msg.length, length: msg.length,
@ -77,7 +58,7 @@ impl Handler<Join> for ChatPinnd {
}); });
return Some(kunjika); return Some(kunjika);
}, JoinType::Kunjika(kunjika) => { }, ms::JoinUserGrihType::Kunjika(kunjika) => {
match self.grih.get_mut(&kunjika) { match self.grih.get_mut(&kunjika) {
Some(grih) => { Some(grih) => {
if let Some(val) = grih.length { if let Some(val) = grih.length {
@ -100,14 +81,14 @@ impl Handler<Join> for ChatPinnd {
} }
} }
impl Handler<Text> for ChatPinnd { impl Handler<ms::ReciveText> for ChatPinnd {
type Result = (); type Result = ();
fn handle(&mut self, msg: Text, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ms::ReciveText, _: &mut Self::Context) -> Self::Result {
println!("Here to text"); println!("Here to text");
if let Some(grih) = self.grih.get(&msg.grih_kunjika) { if let Some(grih) = self.grih.get(&msg.grih_kunjika) {
for client in grih.clients.iter() { for client in grih.clients.iter() {
client.do_send(ws_sansad::Text { client.do_send(ms::WsMessage {
sender: msg.sender_name.clone(), sender: msg.sender_name.clone(),
text: msg.text.clone(), text: msg.text.clone(),
}); });
@ -116,10 +97,10 @@ impl Handler<Text> for ChatPinnd {
} }
} }
impl Handler<Delete> for ChatPinnd { impl Handler<ms::LeaveUser> for ChatPinnd {
type Result = (); type Result = ();
fn handle(&mut self, msg: Delete, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ms::LeaveUser, _: &mut Self::Context) -> Self::Result {
if let Some(grih) = self.grih.get_mut(&msg.grih_kunjika) { if let Some(grih) = self.grih.get_mut(&msg.grih_kunjika) {
if let Some(i) = grih.clients.iter().position(|x| x == &msg.addr) { if let Some(i) = grih.clients.iter().position(|x| x == &msg.addr) {
grih.clients.remove(i); grih.clients.remove(i);
@ -132,19 +113,14 @@ impl Handler<Delete> for ChatPinnd {
} }
} }
impl ChatPinnd { impl Default for ChatPinnd {
pub fn new() -> Self { fn default() -> Self {
ChatPinnd { ChatPinnd {
grih: HashMap::new(), grih: HashMap::new(),
vyaktigat_waitlist: Vec::new() vyaktigat_waitlist: Vec::new()
} }
} }
pub fn start() -> Addr<ChatPinnd> {
ChatPinnd::new().start()
}
} }
impl Message for Join { type Result = Option<i32>; } impl SystemService for ChatPinnd {}
impl Message for Text { type Result = (); } impl Supervised for ChatPinnd {}
impl Message for Delete { type Result = (); }

View File

@ -1,22 +1,19 @@
use actix::Addr;
use actix_web::{App, Error, HttpRequest, HttpResponse, HttpServer, web}; use actix_web::{App, Error, HttpRequest, HttpResponse, HttpServer, web};
use actix_files as fs; use actix_files as fs;
use actix_web_actors::ws; use actix_web_actors::ws;
use chat_pinnd::ChatPinnd;
use ws_sansad::WsSansad; use ws_sansad::WsSansad;
mod config; mod config;
mod messages;
mod ws_sansad; mod ws_sansad;
mod chat_pinnd; mod chat_pinnd;
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let config = config::Config::new(); let config = config::Config::new();
let addr = web::Data::new(ChatPinnd::start());
let static_path = config.static_path; let static_path = config.static_path;
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.app_data(addr.clone())
.service(web::resource("/ws/").route(web::get().to(ws_index))) .service(web::resource("/ws/").route(web::get().to(ws_index)))
.service(fs::Files::new("/", &static_path).index_file("index.html")) .service(fs::Files::new("/", &static_path).index_file("index.html"))
}) })
@ -25,8 +22,6 @@ async fn main() -> std::io::Result<()> {
.await .await
} }
async fn ws_index(req: HttpRequest, stream: web::Payload, pinnd: web::Data<Addr<ChatPinnd>>) -> Result<HttpResponse, Error> { async fn ws_index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let (addr, resp) = ws::start_with_addr(WsSansad::new(pinnd), &req, stream)?; ws::start(WsSansad::new(), &req, stream)
addr.do_send(ws_sansad::SelfAddr(addr.clone()));
Ok(resp)
} }

43
src/messages.rs Normal file
View File

@ -0,0 +1,43 @@
//! Messages to be sent between Actors
use actix::prelude::*;
use crate::ws_sansad::WsSansad;
// For ChatPinnd
#[derive(Clone, Message)]
#[rtype(result = "Option<i32>")]
pub struct JoinUser {
pub grih: JoinUserGrihType,
pub length: Option<usize>,
pub addr: Addr<WsSansad>
}
#[allow(dead_code)]
#[derive(Clone)]
pub enum JoinUserGrihType {
Name(String),
Kunjika(i32)
}
#[derive(Clone, Message)]
#[rtype(result = "()")]
pub struct ReciveText {
pub grih_kunjika: i32,
pub sender_name: String,
pub text: String
}
#[derive(Clone, Message)]
#[rtype(result = "()")]
pub struct LeaveUser {
pub grih_kunjika: i32,
pub addr: Addr<WsSansad>
}
// For WsSansad
#[derive(Clone, Message)]
#[rtype(result = "()")]
pub struct WsMessage {
pub text: String,
pub sender: String
}

View File

@ -1,16 +1,15 @@
//! Ws Sansad manage websocket of each client //! Ws Sansad manage websocket of each client
use actix::{Actor, Addr, Handler, Message, Running, StreamHandler}; use actix::prelude::*;
use actix_web::web; use actix_broker::{Broker, SystemBroker};
use actix_web_actors::ws; use actix_web_actors::ws;
use serde_json::{json, Value}; use serde_json::{json, Value};
use crate::chat_pinnd::ChatPinnd; use crate::chat_pinnd::ChatPinnd;
use crate::chat_pinnd as pd; use crate::messages as ms;
pub struct WsSansad { pub struct WsSansad {
name: String, name: String,
isthiti: Isthiti, isthiti: Isthiti,
pinnd: web::Data<Addr<ChatPinnd>>,
addr: Option<Addr<Self>> addr: Option<Addr<Self>>
} }
@ -26,17 +25,13 @@ pub struct Grih {
// name: String // name: String
} }
// Handler Messages
pub struct Text {
pub text: String,
pub sender: String
}
pub struct SelfAddr(pub Addr<WsSansad>);
impl Actor for WsSansad { impl Actor for WsSansad {
type Context = ws::WebsocketContext<Self>; type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.addr = Some(ctx.address().clone());
}
fn stopping(&mut self, _: &mut Self::Context) -> Running { fn stopping(&mut self, _: &mut Self::Context) -> Running {
futures::executor::block_on(self.end()); futures::executor::block_on(self.end());
Running::Stop Running::Stop
@ -58,9 +53,9 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSansad {
} }
} }
impl Handler<Text> for WsSansad { impl Handler<ms::WsMessage> for WsSansad {
type Result = (); type Result = ();
fn handle(&mut self, msg: Text, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ms::WsMessage, ctx: &mut Self::Context) -> Self::Result {
let json = json!({ let json = json!({
"cmd": "text", "cmd": "text",
"text": msg.text, "text": msg.text,
@ -70,19 +65,11 @@ impl Handler<Text> for WsSansad {
} }
} }
impl Handler<SelfAddr> for WsSansad {
type Result = ();
fn handle(&mut self, msg: SelfAddr, _: &mut Self::Context) -> Self::Result {
self.addr = Some(msg.0);
}
}
impl WsSansad { impl WsSansad {
pub fn new(pinnd: web::Data<Addr<ChatPinnd>>) -> Self { pub fn new() -> Self {
WsSansad { WsSansad {
name: "()".to_owned(), name: "()".to_owned(),
isthiti: Isthiti::None, isthiti: Isthiti::None,
pinnd,
addr: None addr: None
} }
} }
@ -113,7 +100,7 @@ impl WsSansad {
} }
}; };
self.pinnd.do_send(pd::Text { Broker::<SystemBroker>::issue_async(ms::ReciveText {
grih_kunjika, grih_kunjika,
sender_name: self.name.clone(), sender_name: self.name.clone(),
text text
@ -127,8 +114,8 @@ impl WsSansad {
None => None None => None
}; };
let kunjika = self.pinnd.send(pd::Join{ let kunjika = ChatPinnd::from_registry().send(ms::JoinUser{
grih: pd::JoinType::Name(name.clone()), grih: ms::JoinUserGrihType::Name(name.clone()),
length, length,
addr: self.addr.clone().unwrap() addr: self.addr.clone().unwrap()
}).await.unwrap().unwrap(); }).await.unwrap().unwrap();
@ -141,13 +128,10 @@ impl WsSansad {
async fn end(&mut self) { async fn end(&mut self) {
if let Isthiti::Grih(val) = &mut self.isthiti { if let Isthiti::Grih(val) = &mut self.isthiti {
self.pinnd.do_send(pd::Delete { Broker::<SystemBroker>::issue_async(ms::LeaveUser {
grih_kunjika: val.kunjika, grih_kunjika: val.kunjika,
addr: self.addr.clone().unwrap() addr: self.addr.clone().unwrap()
}); });
} }
} }
} }
impl Message for Text { type Result = (); }
impl Message for SelfAddr { type Result = (); }