This commit is contained in:
Piyush मिश्रः 2022-01-19 16:58:37 +05:30
parent 37a19916c0
commit 46797ef2e2
17 changed files with 562 additions and 394 deletions

View File

@ -15,9 +15,9 @@
along with Lupt. If not, see <https://www.gnu.org/licenses/> along with Lupt. If not, see <https://www.gnu.org/licenses/>
*/ */
use crate::ws_sansad::WsSansad;
use actix::prelude::*; use actix::prelude::*;
use dev::{MessageResponse, ResponseChannel}; use dev::{MessageResponse, ResponseChannel};
use crate::ws_sansad::WsSansad;
pub mod pind; pub mod pind;
pub mod sansad; pub mod sansad;

View File

@ -16,8 +16,8 @@
*/ */
//! Messages to be sent between Actors //! Messages to be sent between Actors
use super::*;
use super::util::Resp; use super::util::Resp;
use super::*;
//################################################## For ChatPinnd ################################################## //################################################## For ChatPinnd ##################################################
/// Request to change information of vayakti to list of vayakti im ChatPind /// Request to change information of vayakti to list of vayakti im ChatPind
@ -47,14 +47,14 @@ pub struct JoinRandom {
#[rtype(result = "Resp")] #[rtype(result = "Resp")]
pub struct JoinRandomNext { pub struct JoinRandomNext {
pub kaksh_kunjika: String, pub kaksh_kunjika: String,
pub kunjika: String pub kunjika: String,
} }
// Request to send list of users // Request to send list of users
#[derive(Clone, Message)] #[derive(Clone, Message)]
#[rtype(result = "String")] #[rtype(result = "String")]
pub struct List { pub struct List {
pub kaksh_kunjika: String pub kaksh_kunjika: String,
} }
/// Request to leave kaksh /// Request to leave kaksh
@ -63,7 +63,7 @@ pub struct List {
pub struct LeaveVayakti { pub struct LeaveVayakti {
pub kaksh_kunjika: Option<String>, pub kaksh_kunjika: Option<String>,
pub kunjika: String, pub kunjika: String,
pub addr: Addr<WsSansad> pub addr: Addr<WsSansad>,
} }
/// Request to send text /// Request to send text
@ -82,7 +82,7 @@ pub struct SendText {
pub struct SendImage { pub struct SendImage {
pub kaksh_kunjika: String, pub kaksh_kunjika: String,
pub kunjika: String, pub kunjika: String,
pub src: String pub src: String,
} }
/// Request to reaction /// Request to reaction
#[derive(Clone, Message)] #[derive(Clone, Message)]
@ -91,7 +91,7 @@ pub struct SendReaction {
pub kaksh_kunjika: String, pub kaksh_kunjika: String,
pub kunjika: String, pub kunjika: String,
pub emoji: String, pub emoji: String,
pub msg_id: String pub msg_id: String,
} }
// Request to send status // Request to send status
@ -100,7 +100,7 @@ pub struct SendReaction {
pub struct SendStatus { pub struct SendStatus {
pub kaksh_kunjika: String, pub kaksh_kunjika: String,
pub kunjika: String, pub kunjika: String,
pub status: String pub status: String,
} }
// Request to delete messages // Request to delete messages
@ -109,7 +109,7 @@ pub struct SendStatus {
pub struct DeleteMsg { pub struct DeleteMsg {
pub kaksh_kunjika: String, pub kaksh_kunjika: String,
pub kunjika: String, pub kunjika: String,
pub msg_id: Vec<String> pub msg_id: Vec<String>,
} }
// Request to edit messages // Request to edit messages
@ -119,5 +119,5 @@ pub struct EditMsg {
pub kaksh_kunjika: String, pub kaksh_kunjika: String,
pub kunjika: String, pub kunjika: String,
pub text: String, pub text: String,
pub msg_id: String pub msg_id: String,
} }

View File

@ -25,7 +25,7 @@ use super::*;
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct WsConnected { pub struct WsConnected {
pub name: String, pub name: String,
pub kunjika: String pub kunjika: String,
} }
// Got connected to random vayakti // Got connected to random vayakti
@ -34,21 +34,21 @@ pub struct WsConnected {
pub struct WsConnectedRandom { pub struct WsConnectedRandom {
pub name: String, pub name: String,
pub kunjika: String, pub kunjika: String,
pub kaksh_kunjika: String pub kaksh_kunjika: String,
} }
// Request to send hash calculated of kunjika // Request to send hash calculated of kunjika
#[derive(Clone, Message)] #[derive(Clone, Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct WsKunjikaHash { pub struct WsKunjikaHash {
pub kunjika: String pub kunjika: String,
} }
// Request to send list // Request to send list
#[derive(Clone, Message)] #[derive(Clone, Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct WsList { pub struct WsList {
pub json: String pub json: String,
} }
// Notify someone disconnected // Notify someone disconnected
@ -56,10 +56,9 @@ pub struct WsList {
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct WsDisconnected { pub struct WsDisconnected {
pub kunjika: String, pub kunjika: String,
pub name: String pub name: String,
} }
// Request to send Text // Request to send Text
#[derive(Clone, Message)] #[derive(Clone, Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
@ -67,7 +66,7 @@ pub struct WsText {
pub text: String, pub text: String,
pub reply: Option<String>, pub reply: Option<String>,
pub sender_kunjika: String, pub sender_kunjika: String,
pub msg_id: u128 pub msg_id: u128,
} }
// Request to send Image // Request to send Image
@ -76,7 +75,7 @@ pub struct WsText {
pub struct WsImage { pub struct WsImage {
pub src: String, pub src: String,
pub sender_kunjika: String, pub sender_kunjika: String,
pub msg_id: u128 pub msg_id: u128,
} }
// Request to send Reaction // Request to send Reaction
#[derive(Clone, Message)] #[derive(Clone, Message)]
@ -84,16 +83,15 @@ pub struct WsImage {
pub struct WsReaction { pub struct WsReaction {
pub emoji: String, pub emoji: String,
pub sender_kunjika: String, pub sender_kunjika: String,
pub msg_id: String pub msg_id: String,
} }
// Request to send Status // Request to send Status
#[derive(Clone, Message)] #[derive(Clone, Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct WsStatus { pub struct WsStatus {
pub status: String, pub status: String,
pub sender_kunjika: String pub sender_kunjika: String,
} }
// Request to delete messages // Request to delete messages
@ -101,7 +99,7 @@ pub struct WsStatus {
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct WsDeleteMsg { pub struct WsDeleteMsg {
pub msg_id: Vec<String>, pub msg_id: Vec<String>,
pub sender_kunjika: String pub sender_kunjika: String,
} }
// Request to edit messages // Request to edit messages
@ -110,7 +108,7 @@ pub struct WsDeleteMsg {
pub struct WsEditMsg { pub struct WsEditMsg {
pub text: String, pub text: String,
pub sender_kunjika: String, pub sender_kunjika: String,
pub msg_id: String pub msg_id: String,
} }
// Give response message // Give response message
@ -118,5 +116,5 @@ pub struct WsEditMsg {
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct WsResponse { pub struct WsResponse {
pub result: String, pub result: String,
pub message: String pub message: String,
} }

View File

@ -22,7 +22,7 @@ use super::*;
pub enum Resp { pub enum Resp {
Ok, Ok,
Err(String), Err(String),
None None,
} }
impl<A, M> MessageResponse<A, M> for Resp impl<A, M> MessageResponse<A, M> for Resp

View File

@ -30,7 +30,7 @@ impl Handler<ms::pind::SendText> for ChatPinnd {
sender_kunjika: msg.kunjika.to_owned(), sender_kunjika: msg.kunjika.to_owned(),
text: msg.text.to_owned(), text: msg.text.to_owned(),
reply: msg.reply.to_owned(), reply: msg.reply.to_owned(),
msg_id msg_id,
}); });
}); });
} }
@ -49,7 +49,7 @@ impl Handler<ms::pind::SendImage> for ChatPinnd {
c.addr.do_send(ms::sansad::WsImage { c.addr.do_send(ms::sansad::WsImage {
sender_kunjika: msg.kunjika.to_owned(), sender_kunjika: msg.kunjika.to_owned(),
src: msg.src.to_owned(), src: msg.src.to_owned(),
msg_id msg_id,
}); });
}); });
} }
@ -66,7 +66,7 @@ impl Handler<ms::pind::SendReaction> for ChatPinnd {
c.addr.do_send(ms::sansad::WsReaction { c.addr.do_send(ms::sansad::WsReaction {
sender_kunjika: msg.kunjika.to_owned(), sender_kunjika: msg.kunjika.to_owned(),
emoji: msg.emoji.to_owned(), emoji: msg.emoji.to_owned(),
msg_id: msg.msg_id.to_owned() msg_id: msg.msg_id.to_owned(),
}); });
}); });
} }
@ -101,14 +101,13 @@ impl Handler<ms::pind::DeleteMsg> for ChatPinnd {
kaksh.loog.iter().for_each(|c| { kaksh.loog.iter().for_each(|c| {
c.addr.do_send(ms::sansad::WsDeleteMsg { c.addr.do_send(ms::sansad::WsDeleteMsg {
sender_kunjika: msg.kunjika.to_owned(), sender_kunjika: msg.kunjika.to_owned(),
msg_id: msg.msg_id.clone() msg_id: msg.msg_id.clone(),
}); });
}); });
} }
} }
} }
/// send edit messages for everyone /// send edit messages for everyone
impl Handler<ms::pind::EditMsg> for ChatPinnd { impl Handler<ms::pind::EditMsg> for ChatPinnd {
type Result = (); type Result = ();
@ -119,7 +118,7 @@ impl Handler<ms::pind::EditMsg> for ChatPinnd {
c.addr.do_send(ms::sansad::WsEditMsg { c.addr.do_send(ms::sansad::WsEditMsg {
sender_kunjika: msg.kunjika.to_owned(), sender_kunjika: msg.kunjika.to_owned(),
msg_id: msg.msg_id.to_owned(), msg_id: msg.msg_id.to_owned(),
text: msg.text.to_owned() text: msg.text.to_owned(),
}); });
}); });
} }

View File

@ -17,15 +17,15 @@
//! Chat Pinnd(पिण्ड) is Actor to manage Websocket Chat related action //! Chat Pinnd(पिण्ड) is Actor to manage Websocket Chat related action
mod user;
mod message; mod message;
mod user;
use std::{collections::HashMap, vec}; use std::{collections::HashMap, vec};
use actix::prelude::*; use actix::prelude::*;
use actix_broker::BrokerSubscribe; use actix_broker::BrokerSubscribe;
use crate::{ws_sansad, broker_messages as ms, broker_messages::util::Resp}; use crate::{broker_messages as ms, broker_messages::util::Resp, ws_sansad};
#[allow(dead_code)] #[allow(dead_code)]
pub struct ChatPinnd { pub struct ChatPinnd {
@ -36,26 +36,26 @@ pub struct ChatPinnd {
pub struct Kaksh { pub struct Kaksh {
length: Option<usize>, length: Option<usize>,
last_message_id: u128, last_message_id: u128,
loog: Vec<Loog> loog: Vec<Loog>,
} }
pub struct Loog { pub struct Loog {
addr: Addr<ws_sansad::WsSansad>, addr: Addr<ws_sansad::WsSansad>,
kunjika: String, kunjika: String,
name: String, name: String,
tags: Option<Vec<String>> tags: Option<Vec<String>>,
} }
#[derive(Debug, Clone)] // #[derive(Debug, Clone)]
pub struct Vyakti { // pub struct Vyakti {
name: String, // name: String,
tags: Vec<String> // tags: Vec<String>
} // }
pub struct VyaktiWatchlist { pub struct VyaktiWatchlist {
kunjika: String, kunjika: String,
name: String, name: String,
tags: Vec<String>, tags: Vec<String>,
addr: Addr<ws_sansad::WsSansad> addr: Addr<ws_sansad::WsSansad>,
} }
impl Actor for ChatPinnd { impl Actor for ChatPinnd {
@ -73,31 +73,30 @@ impl Actor for ChatPinnd {
} }
} }
impl Default for ChatPinnd { impl Default for ChatPinnd {
fn default() -> Self { fn default() -> Self {
ChatPinnd { ChatPinnd {
kaksh: HashMap::new(), kaksh: HashMap::new(),
vyaktigat_waitlist: Vec::new() vyaktigat_waitlist: Vec::new(),
} }
} }
} }
impl Loog { impl Loog {
fn new(addr: Addr<ws_sansad::WsSansad>, fn new(
addr: Addr<ws_sansad::WsSansad>,
kunjika: String, kunjika: String,
name: String, name: String,
tags: Option<Vec<String>>) -> Self { tags: Option<Vec<String>>,
) -> Self {
Loog { Loog {
addr, addr,
kunjika, kunjika,
name, name,
tags tags,
} }
} }
} }
impl SystemService for ChatPinnd {} impl SystemService for ChatPinnd {}
impl Supervised for ChatPinnd {} impl Supervised for ChatPinnd {}

View File

@ -23,14 +23,18 @@ impl Handler<ms::pind::JoinKaksh> for ChatPinnd {
fn handle(&mut self, msg: ms::pind::JoinKaksh, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ms::pind::JoinKaksh, _: &mut Self::Context) -> Self::Result {
// check if user exist // check if user exist
if let Some(_) = self.vyaktigat_waitlist.iter().position(|vk| vk.kunjika == msg.kunjika) { if let Some(_) = self
.vyaktigat_waitlist
.iter()
.position(|vk| vk.kunjika == msg.kunjika)
{
return Resp::Err("Kunjika already exist".to_owned()); return Resp::Err("Kunjika already exist".to_owned());
} }
if let Some(_) = self.kaksh.iter().position(|(_, g)| { if let Some(_) = self.kaksh.iter().position(|(_, g)| {
match g.loog.iter().position(|a| a.kunjika == msg.kunjika) { match g.loog.iter().position(|a| a.kunjika == msg.kunjika) {
Some(_) => true, Some(_) => true,
None => false None => false,
} }
}) { }) {
return Resp::Err("Kunjika already exist".to_owned()); return Resp::Err("Kunjika already exist".to_owned());
@ -38,7 +42,8 @@ impl Handler<ms::pind::JoinKaksh> for ChatPinnd {
// check if kaksh exist and add user // check if kaksh exist and add user
match self.kaksh.get_mut(&msg.kaksh_kunjika) { match self.kaksh.get_mut(&msg.kaksh_kunjika) {
Some(kaksh) =>{ // exist Some(kaksh) => {
// exist
// check if kaksh have no space left // check if kaksh have no space left
if let Some(n) = kaksh.length { if let Some(n) = kaksh.length {
if kaksh.loog.len() >= n { if kaksh.loog.len() >= n {
@ -49,24 +54,29 @@ impl Handler<ms::pind::JoinKaksh> for ChatPinnd {
kaksh.loog.iter().for_each(|a: &Loog| { kaksh.loog.iter().for_each(|a: &Loog| {
a.addr.do_send(ms::sansad::WsConnected { a.addr.do_send(ms::sansad::WsConnected {
name: msg.name.to_owned(), name: msg.name.to_owned(),
kunjika: msg.kunjika.to_owned() kunjika: msg.kunjika.to_owned(),
}) })
}); });
kaksh.loog.push(Loog::new(msg.addr, msg.kunjika,msg.name, None)); kaksh
.loog
.push(Loog::new(msg.addr, msg.kunjika, msg.name, None));
}, None => { // don't exist }
None => {
// don't exist
// add kaksh and notify // add kaksh and notify
msg.addr.do_send(ms::sansad::WsConnected { msg.addr.do_send(ms::sansad::WsConnected {
name: msg.name.to_owned(), name: msg.name.to_owned(),
kunjika: msg.kunjika.to_owned() kunjika: msg.kunjika.to_owned(),
}); });
self.kaksh.insert(msg.kaksh_kunjika, Kaksh { self.kaksh.insert(
msg.kaksh_kunjika,
Kaksh {
length: msg.length, length: msg.length,
last_message_id: 0, last_message_id: 0,
loog: vec![Loog::new(msg.addr,msg.kunjika,msg.name, None)] loog: vec![Loog::new(msg.addr, msg.kunjika, msg.name, None)],
}); },
);
} }
} }
@ -82,14 +92,18 @@ impl Handler<ms::pind::JoinRandom> for ChatPinnd {
type Result = Resp; type Result = Resp;
fn handle(&mut self, msg: ms::pind::JoinRandom, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ms::pind::JoinRandom, _: &mut Self::Context) -> Self::Result {
// check if user exist // check if user exist
if let Some(_) = self.vyaktigat_waitlist.iter().position(|vk| vk.kunjika == msg.kunjika) { if let Some(_) = self
.vyaktigat_waitlist
.iter()
.position(|vk| vk.kunjika == msg.kunjika)
{
return Resp::Err("Kunjika already exist".to_owned()); return Resp::Err("Kunjika already exist".to_owned());
} }
if let Some(_) = self.kaksh.iter().position(|(_, g)| { if let Some(_) = self.kaksh.iter().position(|(_, g)| {
match g.loog.iter().position(|a| a.kunjika == msg.kunjika) { match g.loog.iter().position(|a| a.kunjika == msg.kunjika) {
Some(_) => true, Some(_) => true,
None => false None => false,
} }
}) { }) {
return Resp::Err("Kunjika already exist".to_owned()); return Resp::Err("Kunjika already exist".to_owned());
@ -101,7 +115,7 @@ impl Handler<ms::pind::JoinRandom> for ChatPinnd {
kunjika: msg.kunjika, kunjika: msg.kunjika,
addr: msg.addr, addr: msg.addr,
name: msg.name, name: msg.name,
tags: msg.tags tags: msg.tags,
}); });
return Resp::None; return Resp::None;
} }
@ -111,7 +125,7 @@ impl Handler<ms::pind::JoinRandom> for ChatPinnd {
match self.vyaktigat_waitlist.iter().position(|vk| { match self.vyaktigat_waitlist.iter().position(|vk| {
match vk.tags.iter().position(|t| msg.tags.contains(t)) { match vk.tags.iter().position(|t| msg.tags.contains(t)) {
Some(_) => true, Some(_) => true,
None => false None => false,
} }
}) { }) {
Some(i) => i, Some(i) => i,
@ -120,32 +134,55 @@ impl Handler<ms::pind::JoinRandom> for ChatPinnd {
kunjika: msg.kunjika, kunjika: msg.kunjika,
addr: msg.addr, addr: msg.addr,
name: msg.name, name: msg.name,
tags: msg.tags tags: msg.tags,
}); });
return Resp::None; return Resp::None;
} }
} }
} else { 0 }; } else {
0
};
let vayakti_watchlist = self.vyaktigat_waitlist.remove(pos); let vayakti_watchlist = self.vyaktigat_waitlist.remove(pos);
let group_kunjika = format!("gupt_{}>{}",msg.kunjika.to_owned(), vayakti_watchlist.kunjika); let group_kunjika = format!(
self.kaksh.insert(group_kunjika.to_owned(), Kaksh { "gupt_{}>{}",
msg.kunjika.to_owned(),
vayakti_watchlist.kunjika
);
self.kaksh.insert(
group_kunjika.to_owned(),
Kaksh {
length: Some(2), length: Some(2),
last_message_id: 0, last_message_id: 0,
loog: vec![Loog::new(msg.addr.clone(), msg.kunjika.to_owned(), msg.name.to_owned(), Some(msg.tags.clone())), loog: vec![
Loog::new(vayakti_watchlist.addr.clone(), vayakti_watchlist.kunjika.to_owned(), vayakti_watchlist.name.to_owned(), Some(vayakti_watchlist.tags.clone()))] Loog::new(
}); msg.addr.clone(),
msg.kunjika.to_owned(),
msg.name.to_owned(),
Some(msg.tags.clone()),
),
Loog::new(
vayakti_watchlist.addr.clone(),
vayakti_watchlist.kunjika.to_owned(),
vayakti_watchlist.name.to_owned(),
Some(vayakti_watchlist.tags.clone()),
),
],
},
);
// notify about connection // notify about connection
msg.addr.do_send(ms::sansad::WsConnectedRandom { msg.addr.do_send(ms::sansad::WsConnectedRandom {
name: vayakti_watchlist.name, name: vayakti_watchlist.name,
kunjika: vayakti_watchlist.kunjika, kunjika: vayakti_watchlist.kunjika,
kaksh_kunjika: group_kunjika.to_owned() kaksh_kunjika: group_kunjika.to_owned(),
}); });
vayakti_watchlist.addr.do_send(ms::sansad::WsConnectedRandom { vayakti_watchlist
.addr
.do_send(ms::sansad::WsConnectedRandom {
name: msg.name, name: msg.name,
kunjika: msg.kunjika.to_owned(), kunjika: msg.kunjika.to_owned(),
kaksh_kunjika: group_kunjika kaksh_kunjika: group_kunjika,
}); });
Resp::Ok Resp::Ok
@ -158,12 +195,12 @@ impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
fn handle(&mut self, msg: ms::pind::JoinRandomNext, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ms::pind::JoinRandomNext, _: &mut Self::Context) -> Self::Result {
let kaksh = match self.kaksh.get_mut(&msg.kaksh_kunjika) { let kaksh = match self.kaksh.get_mut(&msg.kaksh_kunjika) {
Some(v) => v, Some(v) => v,
None => return Resp::Err("Failed to join, check entries!".to_owned()) None => return Resp::Err("Failed to join, check entries!".to_owned()),
}; };
let loog_i = match kaksh.loog.iter().position(|a| a.kunjika == msg.kunjika) { let loog_i = match kaksh.loog.iter().position(|a| a.kunjika == msg.kunjika) {
Some(v) => v, Some(v) => v,
None => return Resp::Err("Failed to join, check entries!".to_owned()) None => return Resp::Err("Failed to join, check entries!".to_owned()),
}; };
let addr; let addr;
@ -173,7 +210,7 @@ impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
{ {
let loog = match kaksh.loog.get(loog_i) { let loog = match kaksh.loog.get(loog_i) {
Some(v) => v, Some(v) => v,
None => return Resp::Err("Failed to join, check entries!".to_owned()) None => return Resp::Err("Failed to join, check entries!".to_owned()),
}; };
if let None = loog.tags { if let None = loog.tags {
@ -184,7 +221,7 @@ impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
name = loog.name.to_owned(); name = loog.name.to_owned();
tags = match loog.tags.clone() { tags = match loog.tags.clone() {
Some(v) => v, Some(v) => v,
None => return Resp::Err("Failed to join, check entries!".to_owned()) None => return Resp::Err("Failed to join, check entries!".to_owned()),
}; };
} }
@ -193,7 +230,7 @@ impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
kaksh.loog.iter().for_each(|a| { kaksh.loog.iter().for_each(|a| {
a.addr.do_send(ms::sansad::WsDisconnected { a.addr.do_send(ms::sansad::WsDisconnected {
kunjika: msg.kunjika.to_owned(), kunjika: msg.kunjika.to_owned(),
name: name.to_owned() name: name.to_owned(),
}) })
}); });
@ -203,7 +240,7 @@ impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
kunjika: msg.kunjika, kunjika: msg.kunjika,
addr, addr,
name, name,
tags tags,
}); });
return Resp::None; return Resp::None;
} }
@ -212,7 +249,7 @@ impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
match self.vyaktigat_waitlist.iter().position(|vk| { match self.vyaktigat_waitlist.iter().position(|vk| {
match vk.tags.iter().position(|t| tags.contains(t)) { match vk.tags.iter().position(|t| tags.contains(t)) {
Some(_) => true, Some(_) => true,
None => false None => false,
} }
}) { }) {
Some(i) => i, Some(i) => i,
@ -221,37 +258,60 @@ impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
kunjika: msg.kunjika, kunjika: msg.kunjika,
addr, addr,
name, name,
tags tags,
}); });
return Resp::None; return Resp::None;
} }
} }
} else { 0 }; } else {
0
};
let vayakti_watchlist = self.vyaktigat_waitlist.remove(pos); let vayakti_watchlist = self.vyaktigat_waitlist.remove(pos);
let group_kunjika = format!("gupt_{}>{}",msg.kunjika.to_owned(), vayakti_watchlist.kunjika); let group_kunjika = format!(
"gupt_{}>{}",
msg.kunjika.to_owned(),
vayakti_watchlist.kunjika
);
let log_count = kaksh.loog.len(); let log_count = kaksh.loog.len();
drop(kaksh); drop(kaksh);
if log_count == 0 { if log_count == 0 {
self.kaksh.remove(&msg.kaksh_kunjika); self.kaksh.remove(&msg.kaksh_kunjika);
} }
self.kaksh.insert(group_kunjika.to_owned(), Kaksh { self.kaksh.insert(
group_kunjika.to_owned(),
Kaksh {
length: Some(2), length: Some(2),
last_message_id: 0, last_message_id: 0,
loog: vec![Loog::new(addr.clone(), msg.kunjika.to_owned(), name.to_owned(), Some(tags.clone())), loog: vec![
Loog::new(vayakti_watchlist.addr.clone(), vayakti_watchlist.kunjika.to_owned(), vayakti_watchlist.name.to_owned(), Some(vayakti_watchlist.tags.clone()))] Loog::new(
}); addr.clone(),
msg.kunjika.to_owned(),
name.to_owned(),
Some(tags.clone()),
),
Loog::new(
vayakti_watchlist.addr.clone(),
vayakti_watchlist.kunjika.to_owned(),
vayakti_watchlist.name.to_owned(),
Some(vayakti_watchlist.tags.clone()),
),
],
},
);
// notify about connection // notify about connection
addr.do_send(ms::sansad::WsConnectedRandom { addr.do_send(ms::sansad::WsConnectedRandom {
name: vayakti_watchlist.name, name: vayakti_watchlist.name,
kunjika: vayakti_watchlist.kunjika, kunjika: vayakti_watchlist.kunjika,
kaksh_kunjika: group_kunjika.to_owned() kaksh_kunjika: group_kunjika.to_owned(),
}); });
vayakti_watchlist.addr.do_send(ms::sansad::WsConnectedRandom { vayakti_watchlist
.addr
.do_send(ms::sansad::WsConnectedRandom {
name, name,
kunjika: msg.kunjika.to_owned(), kunjika: msg.kunjika.to_owned(),
kaksh_kunjika: group_kunjika kaksh_kunjika: group_kunjika,
}); });
Resp::Ok Resp::Ok
@ -285,12 +345,14 @@ impl Handler<ms::pind::LeaveVayakti> for ChatPinnd {
if kaksh.loog.len() > 1 { if kaksh.loog.len() > 1 {
let name = if let Some(i) = kaksh.loog.iter().position(|x| x.addr == msg.addr) { let name = if let Some(i) = kaksh.loog.iter().position(|x| x.addr == msg.addr) {
kaksh.loog.remove(i).name kaksh.loog.remove(i).name
} else { "".to_owned() }; } else {
"".to_owned()
};
kaksh.loog.iter().for_each(|a| { kaksh.loog.iter().for_each(|a| {
a.addr.do_send(ms::sansad::WsDisconnected { a.addr.do_send(ms::sansad::WsDisconnected {
kunjika: msg.kunjika.to_owned(), kunjika: msg.kunjika.to_owned(),
name: name.to_owned() name: name.to_owned(),
}) })
}); });
} else { } else {
@ -299,7 +361,11 @@ impl Handler<ms::pind::LeaveVayakti> for ChatPinnd {
} }
} }
if let Some(i) = self.vyaktigat_waitlist.iter().position(|a| a.kunjika == msg.kunjika) { if let Some(i) = self
.vyaktigat_waitlist
.iter()
.position(|a| a.kunjika == msg.kunjika)
{
self.vyaktigat_waitlist.remove(i); self.vyaktigat_waitlist.remove(i);
} }
} }

View File

@ -23,7 +23,7 @@ pub struct Config {
pub bind_address: String, pub bind_address: String,
pub port: String, pub port: String,
pub port_x: String, pub port_x: String,
pub config: ConfigFile pub config: ConfigFile,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -32,7 +32,7 @@ pub struct ConfigFile {
pub tenor_key: String, pub tenor_key: String,
pub ssl_cert: String, pub ssl_cert: String,
pub ssl_key: String, pub ssl_key: String,
pub logger_pattern: String pub logger_pattern: String,
} }
impl Config { impl Config {
@ -41,47 +41,58 @@ impl Config {
.version(env!("CARGO_PKG_VERSION")) .version(env!("CARGO_PKG_VERSION"))
.author(env!("CARGO_PKG_AUTHORS")) .author(env!("CARGO_PKG_AUTHORS"))
.about(env!("CARGO_PKG_DESCRIPTION")) .about(env!("CARGO_PKG_DESCRIPTION"))
.arg(Arg::with_name("bind_address") .arg(
Arg::with_name("bind_address")
.short("a") .short("a")
.long("bind_address") .long("bind_address")
.value_name("ADDRESS") .value_name("ADDRESS")
.help("Address to bind for server") .help("Address to bind for server")
.required(true) .required(true)
.takes_value(true)) .takes_value(true),
.arg(Arg::with_name("port") )
.arg(
Arg::with_name("port")
.short("p") .short("p")
.long("port") .long("port")
.value_name("PORT") .value_name("PORT")
.help("Port to bind for server") .help("Port to bind for server")
.required(true) .required(true)
.takes_value(true)) .takes_value(true),
.arg(Arg::with_name("port_x") )
.arg(
Arg::with_name("port_x")
.short("x") .short("x")
.long("port_x") .long("port_x")
.value_name("PORT") .value_name("PORT")
.help("Port to bind for http if ssl is enabled to redirect to https") .help("Port to bind for http if ssl is enabled to redirect to https")
.required(false) .required(false)
.takes_value(true)) .takes_value(true),
.arg(Arg::with_name("static_path") )
.arg(
Arg::with_name("static_path")
.short("s") .short("s")
.long("static_path") .long("static_path")
.value_name("DIR") .value_name("DIR")
.help("Path of directory with index.html") .help("Path of directory with index.html")
.required(true) .required(true)
.takes_value(true)) .takes_value(true),
.arg(Arg::with_name("config") )
.arg(
Arg::with_name("config")
.short("c") .short("c")
.long("config") .long("config")
.value_name("FILE") .value_name("FILE")
.help("Path to config file") .help("Path to config file")
.required(true) .required(true)
.takes_value(true)) .takes_value(true),
)
.get_matches(); .get_matches();
let conf = matches.value_of("config").unwrap().to_owned(); let conf = matches.value_of("config").unwrap().to_owned();
let conf = std::fs::read_to_string(conf).expect("Failed to read config"); let conf = std::fs::read_to_string(conf).expect("Failed to read config");
let config = serde_json::from_str::<ConfigFile>(&conf).expect(r" let config = serde_json::from_str::<ConfigFile>(&conf).expect(
r"
Config File is corrupt. Config File is corrupt.
Config file must have following fields Config file must have following fields
@ -90,15 +101,15 @@ Config file must have following fields
- ssl_cert: Path to certificate of ssl - ssl_cert: Path to certificate of ssl
- ssl_key: Path to private key of ssl - ssl_key: Path to private key of ssl
- logger_pattern: Pattern to make log according to Actix Logger - logger_pattern: Pattern to make log according to Actix Logger
"); ",
);
Config { Config {
static_path: matches.value_of("static_path").unwrap().to_owned(), static_path: matches.value_of("static_path").unwrap().to_owned(),
bind_address: matches.value_of("bind_address").unwrap().to_owned(), bind_address: matches.value_of("bind_address").unwrap().to_owned(),
port: matches.value_of("port").unwrap().to_owned(), port: matches.value_of("port").unwrap().to_owned(),
port_x: matches.value_of("port_x").unwrap_or("").to_owned(), port_x: matches.value_of("port_x").unwrap_or("").to_owned(),
config config,
} }
} }
} }

View File

@ -24,4 +24,3 @@ impl fmt::Display for KakshFullError {
write!(f, "No space left for more user!") write!(f, "No space left for more user!")
} }
} }

View File

@ -15,8 +15,8 @@
along with Lupt. If not, see <https://www.gnu.org/licenses/> along with Lupt. If not, see <https://www.gnu.org/licenses/>
*/ */
use std::fmt;
use std::error::Error; use std::error::Error;
use std::fmt;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AlreadyExistError; pub struct AlreadyExistError;
@ -27,6 +27,4 @@ impl fmt::Display for AlreadyExistError {
} }
} }
impl Error for AlreadyExistError { impl Error for AlreadyExistError {}
}

View File

@ -27,23 +27,24 @@
#[macro_use] #[macro_use]
extern crate lazy_static; extern crate lazy_static;
use actix_web::{
App, Error, HttpRequest, HttpResponse, HttpServer, middleware::Logger, web,
client::{Client, Connector}
};
use actix_files as fs; use actix_files as fs;
use actix_ratelimit::{MemoryStore, MemoryStoreActor, RateLimiter};
use actix_web::{
client::{Client, Connector},
middleware::Logger,
web, App, Error, HttpRequest, HttpResponse, HttpServer,
};
use actix_web_actors::ws; use actix_web_actors::ws;
use actix_ratelimit::{RateLimiter, MemoryStore, MemoryStoreActor};
use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslConnector, SslFiletype, SslMethod}; use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslConnector, SslFiletype, SslMethod};
use ws_sansad::WsSansad;
use std::sync::RwLock; use std::sync::RwLock;
use ws_sansad::WsSansad;
mod broker_messages;
mod chat_pinnd;
mod config; mod config;
mod errors; mod errors;
mod broker_messages;
mod ws_sansad;
mod chat_pinnd;
mod validator; mod validator;
mod ws_sansad;
lazy_static! { lazy_static! {
pub static ref SALT: RwLock<String> = RwLock::new(String::new()); pub static ref SALT: RwLock<String> = RwLock::new(String::new());
@ -67,30 +68,42 @@ async fn main() -> std::io::Result<()> {
let port_x = config.port_x.clone(); let port_x = config.port_x.clone();
let port = config.port.clone(); let port = config.port.clone();
if ssl_builder.is_some() && config.port_x != "" { if ssl_builder.is_some() && config.port_x != "" {
redirect = Some(HttpServer::new(move || { redirect = Some(
HttpServer::new(move || {
App::new() App::new()
.wrap( .wrap(
RateLimiter::new( RateLimiter::new(
MemoryStoreActor::from(MemoryStore::new().clone()).start()) MemoryStoreActor::from(MemoryStore::new().clone()).start(),
.with_interval(std::time::Duration::from_secs(60))
.with_max_requests(100)
) )
.wrap(actix_web_middleware_redirect_https::RedirectHTTPS::with_replacements(&[(port_x.clone(), port.clone())])) .with_interval(std::time::Duration::from_secs(60))
.route("/", web::get().to(|| HttpResponse::Ok() .with_max_requests(100),
)
.wrap(
actix_web_middleware_redirect_https::RedirectHTTPS::with_replacements(&[(
port_x.clone(),
port.clone(),
)]),
)
.route(
"/",
web::get().to(|| {
HttpResponse::Ok()
.content_type("text/plain") .content_type("text/plain")
.body("Always HTTPS on non-default ports!"))) .body("Always HTTPS on non-default ports!")
}),
)
}) })
.bind(format!("{}:{}", config.bind_address, config.port_x))? .bind(format!("{}:{}", config.bind_address, config.port_x))?
.run()); .run(),
);
} }
let server = HttpServer::new(move || { let server = HttpServer::new(move || {
App::new() App::new()
.wrap( .wrap(
RateLimiter::new( RateLimiter::new(MemoryStoreActor::from(MemoryStore::new().clone()).start())
MemoryStoreActor::from(MemoryStore::new().clone()).start())
.with_interval(std::time::Duration::from_secs(60)) .with_interval(std::time::Duration::from_secs(60))
.with_max_requests(200) .with_max_requests(200),
) )
.wrap(Logger::new(&logger_pattern)) .wrap(Logger::new(&logger_pattern))
.service(web::resource("/ws/").route(web::get().to(ws_index))) .service(web::resource("/ws/").route(web::get().to(ws_index)))
@ -100,10 +113,18 @@ async fn main() -> std::io::Result<()> {
}); });
if ssl_builder.is_some() && config.port_x != "" { if ssl_builder.is_some() && config.port_x != "" {
let srv = server.bind_openssl(format!("{}:{}", config.bind_address, config.port), ssl_builder.unwrap())?.run(); let srv = server
.bind_openssl(
format!("{}:{}", config.bind_address, config.port),
ssl_builder.unwrap(),
)?
.run();
tokio::try_join!(redirect.unwrap(), srv)?; tokio::try_join!(redirect.unwrap(), srv)?;
} else { } else {
server.bind(format!("{}:{}", config.bind_address, config.port))?.run().await?; server
.bind(format!("{}:{}", config.bind_address, config.port))?
.run()
.await?;
} }
Ok(()) Ok(())
@ -116,31 +137,38 @@ async fn ws_index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse
async fn gif(req: HttpRequest) -> Result<HttpResponse, Error> { async fn gif(req: HttpRequest) -> Result<HttpResponse, Error> {
let name = req.match_info().get("query").unwrap_or(""); let name = req.match_info().get("query").unwrap_or("");
let mut pos = req.match_info().get("pos").unwrap_or(""); let mut pos = req.match_info().get("pos").unwrap_or("");
if pos == "_" { pos = "" } if pos == "_" {
pos = ""
}
let builder = SslConnector::builder(SslMethod::tls()).unwrap(); let builder = SslConnector::builder(SslMethod::tls()).unwrap();
let client = Client::builder() let client = Client::builder()
.connector(Connector::new().ssl(builder.build()).finish()) .connector(Connector::new().ssl(builder.build()).finish())
.finish(); .finish();
let url = format!(
let url = format!("https://g.tenor.com/v1/search?q={}&key={}&limit=20&media_filter=tinygif&pos={}", name.replace(" ", "+"), TENOR_API_KEY.read().unwrap(), pos); "https://g.tenor.com/v1/search?q={}&key={}&limit=20&media_filter=tinygif&pos={}",
let response = client.get(url) name.replace(" ", "+"),
TENOR_API_KEY.read().unwrap(),
pos
);
let response = client
.get(url)
.header("User-Agent", "actix-web/3.0") .header("User-Agent", "actix-web/3.0")
.send() .send()
.await? .await?
.body() .body()
.await?; .await?;
Ok(HttpResponse::Ok().content_type("application/json").body(response)) Ok(HttpResponse::Ok()
.content_type("application/json")
.body(response))
} }
fn generate_ssl_builder(key: String, cert: String) -> Option<SslAcceptorBuilder> { fn generate_ssl_builder(key: String, cert: String) -> Option<SslAcceptorBuilder> {
if key != "" && cert != "" { if key != "" && cert != "" {
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder builder.set_private_key_file(key, SslFiletype::PEM).unwrap();
.set_private_key_file(key, SslFiletype::PEM)
.unwrap();
builder.set_certificate_chain_file(cert).unwrap(); builder.set_certificate_chain_file(cert).unwrap();
Some(builder) Some(builder)
} else { } else {

View File

@ -71,7 +71,6 @@ fn no_hashtag(dat: &str, entry_name: &str) -> Option<String> {
} }
} }
fn no_and_or_question(dat: &str, entry_name: &str) -> Option<String> { fn no_and_or_question(dat: &str, entry_name: &str) -> Option<String> {
if dat.contains("&") { if dat.contains("&") {
Some(format!("{} shounld not have &", entry_name)) Some(format!("{} shounld not have &", entry_name))

View File

@ -17,7 +17,6 @@
use super::*; use super::*;
/// notify someone got connected /// notify someone got connected
impl Handler<ms::sansad::WsConnected> for WsSansad { impl Handler<ms::sansad::WsConnected> for WsSansad {
type Result = (); type Result = ();
@ -34,7 +33,11 @@ impl Handler<ms::sansad::WsConnected> for WsSansad {
/// notify got connected as random person /// notify got connected as random person
impl Handler<ms::sansad::WsConnectedRandom> for WsSansad { impl Handler<ms::sansad::WsConnectedRandom> for WsSansad {
type Result = (); type Result = ();
fn handle(&mut self, msg: ms::sansad::WsConnectedRandom, ctx: &mut Self::Context) -> Self::Result { fn handle(
&mut self,
msg: ms::sansad::WsConnectedRandom,
ctx: &mut Self::Context,
) -> Self::Result {
self.isthiti = Isthiti::Kaksh(msg.kaksh_kunjika); self.isthiti = Isthiti::Kaksh(msg.kaksh_kunjika);
let json = json!({ let json = json!({
"cmd": "random", "cmd": "random",
@ -177,4 +180,3 @@ impl Handler<ms::sansad::WsResponse> for WsSansad {
ctx.text(json.to_string()); ctx.text(json.to_string());
} }
} }

View File

@ -18,7 +18,6 @@
use super::*; use super::*;
impl WsSansad { impl WsSansad {
/// send text to vayakti in kaksh /// send text to vayakti in kaksh
pub async fn send_text(&mut self, val: Value) { pub async fn send_text(&mut self, val: Value) {
// check if vayakti exist // check if vayakti exist
@ -43,17 +42,19 @@ impl WsSansad {
self.send_err_response("Invalid request"); self.send_err_response("Invalid request");
return; return;
} }
}.as_str().unwrap().to_owned(); }
.as_str()
.unwrap()
.to_owned();
let reply: Option<String> = match val.get("reply") { let reply: Option<String> = match val.get("reply") {
Some(val) => Some(val.as_str().unwrap().to_owned()), Some(val) => Some(val.as_str().unwrap().to_owned()),
None => None None => None,
}; };
let kaksh_kunjika = match &self.isthiti { let kaksh_kunjika = match &self.isthiti {
Isthiti::Kaksh(kaksh_kunjika) => { Isthiti::Kaksh(kaksh_kunjika) => kaksh_kunjika.to_owned(),
kaksh_kunjika.to_owned() _ => {
}, _ => {
return; return;
} }
}; };
@ -61,11 +62,10 @@ impl WsSansad {
kaksh_kunjika, kaksh_kunjika,
kunjika: self.kunjika.to_owned(), kunjika: self.kunjika.to_owned(),
text, text,
reply reply,
}); });
} }
/// send image to vayakti in kaksh /// send image to vayakti in kaksh
pub async fn send_image(&mut self, val: Value) { pub async fn send_image(&mut self, val: Value) {
// check if vayakti exist // check if vayakti exist
@ -90,23 +90,24 @@ impl WsSansad {
self.send_err_response("Invalid request"); self.send_err_response("Invalid request");
return; return;
} }
}.as_str().unwrap().to_owned(); }
.as_str()
.unwrap()
.to_owned();
let kaksh_kunjika = match &self.isthiti { let kaksh_kunjika = match &self.isthiti {
Isthiti::Kaksh(kaksh_kunjika) => { Isthiti::Kaksh(kaksh_kunjika) => kaksh_kunjika.to_owned(),
kaksh_kunjika.to_owned() _ => {
}, _ => {
return; return;
} }
}; };
Broker::<SystemBroker>::issue_async(ms::pind::SendImage { Broker::<SystemBroker>::issue_async(ms::pind::SendImage {
kaksh_kunjika, kaksh_kunjika,
kunjika: self.kunjika.to_owned(), kunjika: self.kunjika.to_owned(),
src src,
}); });
} }
/// send reaction to vayakti in kaksh /// send reaction to vayakti in kaksh
pub async fn send_reaction(&mut self, val: Value) { pub async fn send_reaction(&mut self, val: Value) {
// check if vayakti exist // check if vayakti exist
@ -131,7 +132,10 @@ impl WsSansad {
self.send_err_response("Invalid request"); self.send_err_response("Invalid request");
return; return;
} }
}.as_str().unwrap().to_owned(); }
.as_str()
.unwrap()
.to_owned();
// sent emoji // sent emoji
let msg_id = match val.get("msg_id") { let msg_id = match val.get("msg_id") {
@ -140,12 +144,14 @@ impl WsSansad {
self.send_err_response("Invalid request"); self.send_err_response("Invalid request");
return; return;
} }
}.as_str().unwrap().to_owned(); }
.as_str()
.unwrap()
.to_owned();
let kaksh_kunjika = match &self.isthiti { let kaksh_kunjika = match &self.isthiti {
Isthiti::Kaksh(kaksh_kunjika) => { Isthiti::Kaksh(kaksh_kunjika) => kaksh_kunjika.to_owned(),
kaksh_kunjika.to_owned() _ => {
}, _ => {
return; return;
} }
}; };
@ -153,7 +159,7 @@ impl WsSansad {
kaksh_kunjika, kaksh_kunjika,
kunjika: self.kunjika.to_owned(), kunjika: self.kunjika.to_owned(),
emoji, emoji,
msg_id msg_id,
}); });
} }
@ -181,18 +187,20 @@ impl WsSansad {
self.send_err_response("Invalid request"); self.send_err_response("Invalid request");
return; return;
} }
}.as_str().unwrap().to_owned(); }
.as_str()
.unwrap()
.to_owned();
let kaksh_kunjika = match &self.isthiti { let kaksh_kunjika = match &self.isthiti {
Isthiti::Kaksh(kaksh_kunjika) => { Isthiti::Kaksh(kaksh_kunjika) => kaksh_kunjika.to_owned(),
kaksh_kunjika.to_owned() _ => {
}, _ => {
return; return;
} }
}; };
Broker::<SystemBroker>::issue_async(ms::pind::SendStatus { Broker::<SystemBroker>::issue_async(ms::pind::SendStatus {
kaksh_kunjika, kaksh_kunjika,
kunjika: self.kunjika.to_owned(), kunjika: self.kunjika.to_owned(),
status status,
}); });
} }
@ -222,7 +230,9 @@ impl WsSansad {
self.send_err_response("Invalid request"); self.send_err_response("Invalid request");
return; return;
} }
}.as_array().unwrap(); }
.as_array()
.unwrap();
for id in ids { for id in ids {
msg_id.push(id.as_str().unwrap().to_owned()); msg_id.push(id.as_str().unwrap().to_owned());
@ -230,9 +240,8 @@ impl WsSansad {
drop(ids); drop(ids);
let kaksh_kunjika = match &self.isthiti { let kaksh_kunjika = match &self.isthiti {
Isthiti::Kaksh(kaksh_kunjika) => { Isthiti::Kaksh(kaksh_kunjika) => kaksh_kunjika.to_owned(),
kaksh_kunjika.to_owned() _ => {
}, _ => {
return; return;
} }
}; };
@ -240,7 +249,7 @@ impl WsSansad {
Broker::<SystemBroker>::issue_async(ms::pind::DeleteMsg { Broker::<SystemBroker>::issue_async(ms::pind::DeleteMsg {
kaksh_kunjika, kaksh_kunjika,
kunjika: self.kunjika.to_owned(), kunjika: self.kunjika.to_owned(),
msg_id msg_id,
}); });
} }
@ -268,7 +277,10 @@ impl WsSansad {
self.send_err_response("Invalid request"); self.send_err_response("Invalid request");
return; return;
} }
}.as_str().unwrap().to_owned(); }
.as_str()
.unwrap()
.to_owned();
// msg_id // msg_id
let msg_id = match val.get("msg_id") { let msg_id = match val.get("msg_id") {
@ -277,12 +289,14 @@ impl WsSansad {
self.send_err_response("Invalid request"); self.send_err_response("Invalid request");
return; return;
} }
}.as_str().unwrap().to_owned(); }
.as_str()
.unwrap()
.to_owned();
let kaksh_kunjika = match &self.isthiti { let kaksh_kunjika = match &self.isthiti {
Isthiti::Kaksh(kaksh_kunjika) => { Isthiti::Kaksh(kaksh_kunjika) => kaksh_kunjika.to_owned(),
kaksh_kunjika.to_owned() _ => {
}, _ => {
return; return;
} }
}; };
@ -290,7 +304,7 @@ impl WsSansad {
kaksh_kunjika, kaksh_kunjika,
kunjika: self.kunjika.to_owned(), kunjika: self.kunjika.to_owned(),
text, text,
msg_id msg_id,
}); });
} }
} }

View File

@ -18,8 +18,8 @@
//! Ws Sansad manage websocket of each client //! Ws Sansad manage websocket of each client
mod handlers; mod handlers;
mod users;
mod messages; mod messages;
mod users;
use actix::prelude::*; use actix::prelude::*;
use actix_broker::{Broker, SystemBroker}; use actix_broker::{Broker, SystemBroker};
@ -27,7 +27,12 @@ use actix_web_actors::ws;
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use crate::{chat_pinnd::ChatPinnd, broker_messages as ms, broker_messages::util::Resp, validator::{Validation as vl, validate}}; use crate::{
broker_messages as ms,
broker_messages::util::Resp,
chat_pinnd::ChatPinnd,
validator::{validate, Validation as vl},
};
/// How often heartbeat pings are sent /// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
@ -51,10 +56,9 @@ pub struct WsSansad {
enum Isthiti { enum Isthiti {
None, None,
Kaksh(String), Kaksh(String),
VraktigatWaitlist VraktigatWaitlist,
} }
impl Actor for WsSansad { impl Actor for WsSansad {
type Context = ws::WebsocketContext<Self>; type Context = ws::WebsocketContext<Self>;
@ -65,13 +69,13 @@ impl Actor for WsSansad {
} }
fn stopping(&mut self, _: &mut Self::Context) -> Running { fn stopping(&mut self, _: &mut Self::Context) -> Running {
tokio::runtime::Runtime::new().unwrap() tokio::runtime::Runtime::new()
.unwrap()
.block_on(self.leave_kaksh()); // notify leaving .block_on(self.leave_kaksh()); // notify leaving
Running::Stop Running::Stop
} }
} }
/// manage stream /// manage stream
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSansad { impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSansad {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) { fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
@ -79,22 +83,25 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSansad {
Ok(ws::Message::Ping(msg)) => { Ok(ws::Message::Ping(msg)) => {
ctx.ping(&msg); ctx.ping(&msg);
self.hb = Instant::now(); self.hb = Instant::now();
}, Ok(ws::Message::Pong(_)) => { }
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now(); self.hb = Instant::now();
}, Ok(ws::Message::Text(msg)) => { }
Ok(ws::Message::Text(msg)) => {
self.special_hb = Instant::now(); self.special_hb = Instant::now();
tokio::runtime::Runtime::new().unwrap() tokio::runtime::Runtime::new()
.unwrap()
.block_on(self.parse_text_handle(msg)); .block_on(self.parse_text_handle(msg));
}, Ok(ws::Message::Close(msg)) => { }
Ok(ws::Message::Close(msg)) => {
ctx.close(msg); ctx.close(msg);
ctx.stop(); ctx.stop();
} }
_ => ctx.stop() _ => ctx.stop(),
} }
} }
} }
impl WsSansad { impl WsSansad {
pub fn new() -> Self { pub fn new() -> Self {
WsSansad { WsSansad {
@ -102,7 +109,7 @@ impl WsSansad {
isthiti: Isthiti::None, isthiti: Isthiti::None,
addr: None, addr: None,
hb: Instant::now(), hb: Instant::now(),
special_hb: Instant::now() special_hb: Instant::now(),
} }
} }
@ -116,7 +123,8 @@ impl WsSansad {
// heartbeat timed out // heartbeat timed out
// stop actor // stop actor
tokio::runtime::Runtime::new().unwrap() tokio::runtime::Runtime::new()
.unwrap()
.block_on(act.leave_kaksh()); .block_on(act.leave_kaksh());
ctx.stop(); ctx.stop();
// don't try to send a ping // don't try to send a ping
@ -137,7 +145,8 @@ impl WsSansad {
// heartbeat timed out // heartbeat timed out
// stop actor // stop actor
tokio::runtime::Runtime::new().unwrap() tokio::runtime::Runtime::new()
.unwrap()
.block_on(act.leave_kaksh()); .block_on(act.leave_kaksh());
ctx.stop(); ctx.stop();
// don't try to send a ping // don't try to send a ping
@ -151,18 +160,18 @@ impl WsSansad {
async fn parse_text_handle(&mut self, msg: String) { async fn parse_text_handle(&mut self, msg: String) {
if let Ok(val) = serde_json::from_str::<Value>(&msg) { if let Ok(val) = serde_json::from_str::<Value>(&msg) {
match val.get("cmd").unwrap().as_str().unwrap() { match val.get("cmd").unwrap().as_str().unwrap() {
"join" => { self.join_kaksh(val).await }, "join" => self.join_kaksh(val).await,
"rand" => { self.join_random(val).await }, "rand" => self.join_random(val).await,
"randnext" => { self.join_random_next().await }, "randnext" => self.join_random_next().await,
"text" => { self.send_text(val).await }, "text" => self.send_text(val).await,
"img" => { self.send_image(val).await }, "img" => self.send_image(val).await,
"react" => { self.send_reaction(val).await }, "react" => self.send_reaction(val).await,
"status" => { self.send_status(val).await }, "status" => self.send_status(val).await,
"del" => { self.delete_msg(val).await }, "del" => self.delete_msg(val).await,
"edit" => { self.edit_msg(val).await }, "edit" => self.edit_msg(val).await,
"list" => { self.list().await }, "list" => self.list().await,
"leave" => { self.leave_kaksh().await }, "leave" => self.leave_kaksh().await,
_ => () _ => (),
} }
} }
} }
@ -171,7 +180,7 @@ impl WsSansad {
fn send_ok_response(&self, text: &str) { fn send_ok_response(&self, text: &str) {
self.addr.clone().unwrap().do_send(ms::sansad::WsResponse { self.addr.clone().unwrap().do_send(ms::sansad::WsResponse {
result: "Ok".to_owned(), result: "Ok".to_owned(),
message: text.to_owned() message: text.to_owned(),
}); });
} }
@ -179,7 +188,7 @@ impl WsSansad {
fn send_err_response(&self, text: &str) { fn send_err_response(&self, text: &str) {
self.addr.clone().unwrap().do_send(ms::sansad::WsResponse { self.addr.clone().unwrap().do_send(ms::sansad::WsResponse {
result: "Err".to_owned(), result: "Err".to_owned(),
message: text.to_owned() message: text.to_owned(),
}); });
} }
} }

View File

@ -23,7 +23,7 @@ impl WsSansad {
// Check is already joined // Check is already joined
match self.isthiti { match self.isthiti {
Isthiti::None => (), Isthiti::None => (),
_ => return _ => return,
} }
// is vayakti in watch list // is vayakti in watch list
@ -40,13 +40,21 @@ impl WsSansad {
return; return;
} }
}; };
if let Some(val ) = validate(vec![vl::NonEmpty, vl::NoSpace, vl::NoHashtag, vl::NoAndOrQuestion], &kunjika, "Kunjika") { if let Some(val) = validate(
vec![
vl::NonEmpty,
vl::NoSpace,
vl::NoHashtag,
vl::NoAndOrQuestion,
],
&kunjika,
"Kunjika",
) {
self.send_err_response(&val); self.send_err_response(&val);
return; return;
} }
let mut m = sha1::Sha1::new(); let mut m = sha1::Sha1::new();
m.update(format!("{}{}",kunjika, m.update(format!("{}{}", kunjika, crate::SALT.read().unwrap()).as_bytes());
crate::SALT.read().unwrap()).as_bytes());
let kunjika = base64::encode(m.digest().bytes())[..8].to_owned(); let kunjika = base64::encode(m.digest().bytes())[..8].to_owned();
// Name // Name
@ -70,7 +78,11 @@ impl WsSansad {
return; return;
} }
}; };
if let Some(val ) = validate(vec![vl::NonEmpty, vl::NoGupt, vl::NoSpace, vl::NoAndOrQuestion], &kaksh_kunjika, "Kaksh Kunjika") { if let Some(val) = validate(
vec![vl::NonEmpty, vl::NoGupt, vl::NoSpace, vl::NoAndOrQuestion],
&kaksh_kunjika,
"Kaksh Kunjika",
) {
self.send_err_response(&val); self.send_err_response(&val);
return; return;
} }
@ -79,30 +91,37 @@ impl WsSansad {
let length: Option<usize> = match val.get("length") { let length: Option<usize> = match val.get("length") {
Some(val) => match val.as_i64() { Some(val) => match val.as_i64() {
Some(val) => Some(val as usize), Some(val) => Some(val as usize),
None => None None => None,
}, },
None => None None => None,
}; };
// request // request
let result: Resp = ChatPinnd::from_registry().send(ms::pind::JoinKaksh { let result: Resp = ChatPinnd::from_registry()
.send(ms::pind::JoinKaksh {
kaksh_kunjika: kaksh_kunjika.to_owned(), kaksh_kunjika: kaksh_kunjika.to_owned(),
length, length,
addr: self.addr.clone().unwrap(), addr: self.addr.clone().unwrap(),
kunjika: kunjika.to_owned(), kunjika: kunjika.to_owned(),
name name,
}).await.unwrap(); })
.await
.unwrap();
match result { match result {
Resp::Err(err) => self.send_err_response(&err), Resp::Err(err) => self.send_err_response(&err),
Resp::Ok => { Resp::Ok => {
self.isthiti = Isthiti::Kaksh(kaksh_kunjika); self.isthiti = Isthiti::Kaksh(kaksh_kunjika);
self.addr.clone().unwrap().do_send(ms::sansad::WsKunjikaHash{ kunjika: kunjika.clone() }); self.addr
.clone()
.unwrap()
.do_send(ms::sansad::WsKunjikaHash {
kunjika: kunjika.clone(),
});
self.kunjika = kunjika; self.kunjika = kunjika;
self.send_ok_response("joined") self.send_ok_response("joined")
} }
_ => () _ => (),
} }
} }
@ -114,7 +133,8 @@ impl WsSansad {
Isthiti::VraktigatWaitlist => { Isthiti::VraktigatWaitlist => {
self.send_ok_response("watchlist"); self.send_ok_response("watchlist");
return; return;
}, Isthiti::Kaksh(_) => return }
Isthiti::Kaksh(_) => return,
} }
// Kunjika // Kunjika
@ -125,13 +145,21 @@ impl WsSansad {
return; return;
} }
}; };
if let Some(val ) = validate(vec![vl::NonEmpty, vl::NoSpace, vl::NoHashtag, vl::NoAndOrQuestion], &kunjika, "Kunjika") { if let Some(val) = validate(
vec![
vl::NonEmpty,
vl::NoSpace,
vl::NoHashtag,
vl::NoAndOrQuestion,
],
&kunjika,
"Kunjika",
) {
self.send_err_response(&val); self.send_err_response(&val);
return; return;
} }
let mut m = sha1::Sha1::new(); let mut m = sha1::Sha1::new();
m.update(format!("{}{}",kunjika, m.update(format!("{}{}", kunjika, crate::SALT.read().unwrap()).as_bytes());
crate::SALT.read().unwrap()).as_bytes());
let kunjika = base64::encode(m.digest().bytes())[..8].to_owned(); let kunjika = base64::encode(m.digest().bytes())[..8].to_owned();
// Name // Name
@ -155,33 +183,44 @@ impl WsSansad {
v.push(x.to_owned()); v.push(x.to_owned());
} }
v v
},
None => {
Vec::new()
} }
None => Vec::new(),
}; };
// request // request
let result: Resp = ChatPinnd::from_registry().send(ms::pind::JoinRandom{ let result: Resp = ChatPinnd::from_registry()
.send(ms::pind::JoinRandom {
addr: self.addr.clone().unwrap(), addr: self.addr.clone().unwrap(),
kunjika: kunjika.to_owned(), kunjika: kunjika.to_owned(),
name, name,
tags tags,
}).await.unwrap(); })
.await
.unwrap();
match result { match result {
Resp::Err(err) => self.send_err_response(&err), Resp::Err(err) => self.send_err_response(&err),
Resp::Ok => { Resp::Ok => {
self.addr.clone().unwrap().do_send(ms::sansad::WsKunjikaHash{ kunjika: kunjika.clone() }); self.addr
.clone()
.unwrap()
.do_send(ms::sansad::WsKunjikaHash {
kunjika: kunjika.clone(),
});
self.kunjika = kunjika; self.kunjika = kunjika;
}, }
Resp::None => { Resp::None => {
self.addr.clone().unwrap().do_send(ms::sansad::WsResponse { self.addr.clone().unwrap().do_send(ms::sansad::WsResponse {
result: "watch".to_owned(), result: "watch".to_owned(),
message: "Watchlist".to_owned() message: "Watchlist".to_owned(),
}); });
self.isthiti = Isthiti::VraktigatWaitlist; self.isthiti = Isthiti::VraktigatWaitlist;
self.addr.clone().unwrap().do_send(ms::sansad::WsKunjikaHash{ kunjika: kunjika.clone() }); self.addr
.clone()
.unwrap()
.do_send(ms::sansad::WsKunjikaHash {
kunjika: kunjika.clone(),
});
self.kunjika = kunjika self.kunjika = kunjika
} }
} }
@ -194,7 +233,7 @@ impl WsSansad {
Isthiti::VraktigatWaitlist => { Isthiti::VraktigatWaitlist => {
self.send_ok_response("watchlist"); self.send_ok_response("watchlist");
return; return;
}, }
Isthiti::Kaksh(kaksh_kunjika) => kaksh_kunjika, Isthiti::Kaksh(kaksh_kunjika) => kaksh_kunjika,
Isthiti::None => { Isthiti::None => {
self.send_ok_response("Not allowed"); self.send_ok_response("Not allowed");
@ -203,22 +242,25 @@ impl WsSansad {
}; };
// request // request
let result: Resp = ChatPinnd::from_registry().send(ms::pind::JoinRandomNext { let result: Resp = ChatPinnd::from_registry()
.send(ms::pind::JoinRandomNext {
kunjika: self.kunjika.to_owned(), kunjika: self.kunjika.to_owned(),
kaksh_kunjika: kaksh_kunjika.to_owned(), kaksh_kunjika: kaksh_kunjika.to_owned(),
}).await.unwrap(); })
.await
.unwrap();
match result { match result {
Resp::Err(err) => self.send_err_response(&err), Resp::Err(err) => self.send_err_response(&err),
Resp::None => { Resp::None => {
self.addr.clone().unwrap().do_send(ms::sansad::WsResponse { self.addr.clone().unwrap().do_send(ms::sansad::WsResponse {
result: "watch".to_owned(), result: "watch".to_owned(),
message: "Watchlist".to_owned() message: "Watchlist".to_owned(),
}); });
self.isthiti = Isthiti::VraktigatWaitlist; self.isthiti = Isthiti::VraktigatWaitlist;
self.kunjika = self.kunjika.to_owned() self.kunjika = self.kunjika.to_owned()
} }
_ => () _ => (),
} }
} }
@ -233,14 +275,18 @@ impl WsSansad {
// check if connected to any kaksh // check if connected to any kaksh
match &self.isthiti { match &self.isthiti {
Isthiti::Kaksh(kunjika) => { Isthiti::Kaksh(kunjika) => {
let json: String = ChatPinnd::from_registry().send(ms::pind::List { let json: String = ChatPinnd::from_registry()
kaksh_kunjika: kunjika.to_owned() .send(ms::pind::List {
}).await.unwrap(); kaksh_kunjika: kunjika.to_owned(),
self.addr.clone().unwrap().do_send(ms::sansad::WsList {
json
}) })
}, .await
.unwrap();
self.addr
.clone()
.unwrap()
.do_send(ms::sansad::WsList { json })
}
_ => { _ => {
self.send_err_response("Kaksh not connected"); self.send_err_response("Kaksh not connected");
return; return;
@ -252,13 +298,13 @@ impl WsSansad {
pub async fn leave_kaksh(&mut self) { pub async fn leave_kaksh(&mut self) {
let kaksh_kunjika = match &self.isthiti { let kaksh_kunjika = match &self.isthiti {
Isthiti::Kaksh(val) => Some(val.to_owned()), Isthiti::Kaksh(val) => Some(val.to_owned()),
_ => None _ => None,
}; };
Broker::<SystemBroker>::issue_async(ms::pind::LeaveVayakti { Broker::<SystemBroker>::issue_async(ms::pind::LeaveVayakti {
kaksh_kunjika, kaksh_kunjika,
kunjika: self.kunjika.to_owned(), kunjika: self.kunjika.to_owned(),
addr: self.addr.clone().unwrap() addr: self.addr.clone().unwrap(),
}); });
self.isthiti = Isthiti::None; self.isthiti = Isthiti::None;