Updated actix and related dependencies.

This commit is contained in:
Piyush मिश्रः 2023-06-18 12:04:54 +05:30
parent 170f34cd0a
commit 8326f8f284
7 changed files with 327 additions and 1323 deletions

1510
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -13,21 +13,21 @@ keywords = ["chat", "Chatting", "Talk", "Stranger"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
actix = "0.10" actix = "0.13"
actix-web = { version = "4", features = ["rustls"] } actix-web = { version = "4", features = ["rustls"] }
actix-web-actors = "3" actix-web-actors = "4"
actix-broker = "0.3" actix-broker = "0.4"
actix-files = "0.6" actix-files = "0.6"
env_logger = "0.9" awc = "3.1"
env_logger = "0.10"
openssl = "0.10" openssl = "0.10"
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
lazy_static = "1.4" lazy_static = "1.4"
serde = { version = "1.0", features = ["serde_derive"] }
serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
rand = "0.8" rand = "0.8"
tokio = { version = "1.5", features = ['rt', 'rt-multi-thread', 'macros'] } futures = "0.3"
sha2 = "0.10" sha2 = "0.10"
base64 = "0.21" base64 = "0.21"

View File

@ -17,7 +17,8 @@
use crate::ws_sansad::WsSansad; use crate::ws_sansad::WsSansad;
use actix::prelude::*; use actix::prelude::*;
use dev::{MessageResponse, ResponseChannel}; use dev::MessageResponse;
// use dev::ResponseChannel;
pub mod pind; pub mod pind;
pub mod sansad; pub mod sansad;

View File

@ -15,24 +15,10 @@
along with Lupt. If not, see <https://www.gnu.org/licenses/> along with Lupt. If not, see <https://www.gnu.org/licenses/>
*/ */
use super::*; // //################################################## Helper ##################################################
//################################################## Helper ##################################################
#[derive(Debug)] #[derive(Debug)]
pub enum Resp { pub enum Resp {
Ok, Ok,
Err(String), Err(String),
None, None,
} }
impl<A, M> MessageResponse<A, M> for Resp
where
A: Actor,
M: Message<Result = Resp>,
{
fn handle<R: ResponseChannel<M>>(self, _: &mut A::Context, tx: Option<R>) {
if let Some(tx) = tx {
tx.send(self);
}
}
}

View File

@ -19,7 +19,7 @@ use super::*;
/// Join kaksh /// Join kaksh
impl Handler<ms::pind::JoinKaksh> for ChatPinnd { impl Handler<ms::pind::JoinKaksh> for ChatPinnd {
type Result = Resp; type Result = MessageResult<ms::pind::JoinKaksh>;
fn handle(&mut self, msg: ms::pind::JoinKaksh, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ms::pind::JoinKaksh, _: &mut Self::Context) -> Self::Result {
// check if user exist // check if user exist
@ -28,7 +28,7 @@ impl Handler<ms::pind::JoinKaksh> for ChatPinnd {
.iter() .iter()
.position(|vk| vk.kunjika == msg.kunjika) .position(|vk| vk.kunjika == msg.kunjika)
{ {
return Resp::Err("Kunjika already exist".to_owned()); return MessageResult(Resp::Err("Kunjika already exist".to_owned()));
} }
if let Some(_) = self.kaksh.iter().position(|(_, g)| { if let Some(_) = self.kaksh.iter().position(|(_, g)| {
@ -37,7 +37,7 @@ impl Handler<ms::pind::JoinKaksh> for ChatPinnd {
None => false, None => false,
} }
}) { }) {
return Resp::Err("Kunjika already exist".to_owned()); return MessageResult(Resp::Err("Kunjika already exist".to_owned()));
} }
// check if kaksh exist and add user // check if kaksh exist and add user
@ -47,7 +47,7 @@ impl Handler<ms::pind::JoinKaksh> for ChatPinnd {
// check if kaksh have no space left // check if kaksh have no space left
if let Some(n) = kaksh.length { if let Some(n) = kaksh.length {
if kaksh.loog.len() >= n { if kaksh.loog.len() >= n {
return Resp::Err("Kaksh have no space".to_owned()); return MessageResult(Resp::Err("Kaksh have no space".to_owned()));
} }
} }
@ -80,7 +80,7 @@ impl Handler<ms::pind::JoinKaksh> for ChatPinnd {
} }
} }
Resp::Ok MessageResult(Resp::Ok)
} }
} }
@ -89,7 +89,7 @@ impl Handler<ms::pind::JoinKaksh> for ChatPinnd {
/// Check if watchlist is empty, if yes add the kunjika andaddr to watchlist /// Check if watchlist is empty, if yes add the kunjika andaddr to watchlist
/// if watchlist have people get 0th person an connect it /// if watchlist have people get 0th person an connect it
impl Handler<ms::pind::JoinRandom> for ChatPinnd { impl Handler<ms::pind::JoinRandom> for ChatPinnd {
type Result = Resp; type Result = MessageResult<ms::pind::JoinRandom>;
fn handle(&mut self, msg: ms::pind::JoinRandom, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ms::pind::JoinRandom, _: &mut Self::Context) -> Self::Result {
// check if user exist // check if user exist
if let Some(_) = self if let Some(_) = self
@ -97,7 +97,7 @@ impl Handler<ms::pind::JoinRandom> for ChatPinnd {
.iter() .iter()
.position(|vk| vk.kunjika == msg.kunjika) .position(|vk| vk.kunjika == msg.kunjika)
{ {
return Resp::Err("Kunjika already exist".to_owned()); return MessageResult(Resp::Err("Kunjika already exist".to_owned()));
} }
if let Some(_) = self.kaksh.iter().position(|(_, g)| { if let Some(_) = self.kaksh.iter().position(|(_, g)| {
@ -106,7 +106,7 @@ impl Handler<ms::pind::JoinRandom> for ChatPinnd {
None => false, None => false,
} }
}) { }) {
return Resp::Err("Kunjika already exist".to_owned()); return MessageResult(Resp::Err("Kunjika already exist".to_owned()));
} }
// Check if watch list is empty // Check if watch list is empty
@ -117,7 +117,7 @@ impl Handler<ms::pind::JoinRandom> for ChatPinnd {
name: msg.name, name: msg.name,
tags: msg.tags, tags: msg.tags,
}); });
return Resp::None; return MessageResult(Resp::None);
} }
// connect person with tag // connect person with tag
@ -136,7 +136,7 @@ impl Handler<ms::pind::JoinRandom> for ChatPinnd {
name: msg.name, name: msg.name,
tags: msg.tags, tags: msg.tags,
}); });
return Resp::None; return MessageResult(Resp::None);
} }
} }
} else { } else {
@ -185,22 +185,22 @@ impl Handler<ms::pind::JoinRandom> for ChatPinnd {
kaksh_kunjika: group_kunjika, kaksh_kunjika: group_kunjika,
}); });
Resp::Ok MessageResult(Resp::Ok)
} }
} }
/// Next Random next vayakti /// Next Random next vayakti
impl Handler<ms::pind::JoinRandomNext> for ChatPinnd { impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
type Result = Resp; type Result = MessageResult<ms::pind::JoinRandomNext>;
fn handle(&mut self, msg: ms::pind::JoinRandomNext, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ms::pind::JoinRandomNext, _: &mut Self::Context) -> Self::Result {
let kaksh = match self.kaksh.get_mut(&msg.kaksh_kunjika) { let kaksh = match self.kaksh.get_mut(&msg.kaksh_kunjika) {
Some(v) => v, Some(v) => v,
None => return Resp::Err("Failed to join, check entries!".to_owned()), None => return MessageResult(Resp::Err("Failed to join, check entries!".to_owned())),
}; };
let loog_i = match kaksh.loog.iter().position(|a| a.kunjika == msg.kunjika) { let loog_i = match kaksh.loog.iter().position(|a| a.kunjika == msg.kunjika) {
Some(v) => v, Some(v) => v,
None => return Resp::Err("Failed to join, check entries!".to_owned()), None => return MessageResult(Resp::Err("Failed to join, check entries!".to_owned())),
}; };
let addr; let addr;
@ -210,18 +210,22 @@ impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
{ {
let loog = match kaksh.loog.get(loog_i) { let loog = match kaksh.loog.get(loog_i) {
Some(v) => v, Some(v) => v,
None => return Resp::Err("Failed to join, check entries!".to_owned()), None => {
return MessageResult(Resp::Err("Failed to join, check entries!".to_owned()))
}
}; };
if let None = loog.tags { if let None = loog.tags {
return Resp::Err("You are not a randome vyakti!".to_owned()); return MessageResult(Resp::Err("You are not a randome vyakti!".to_owned()));
} }
addr = loog.addr.clone(); addr = loog.addr.clone();
name = loog.name.to_owned(); name = loog.name.to_owned();
tags = match loog.tags.clone() { tags = match loog.tags.clone() {
Some(v) => v, Some(v) => v,
None => return Resp::Err("Failed to join, check entries!".to_owned()), None => {
return MessageResult(Resp::Err("Failed to join, check entries!".to_owned()))
}
}; };
} }
@ -242,7 +246,7 @@ impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
name, name,
tags, tags,
}); });
return Resp::None; return MessageResult(Resp::None);
} }
// connect person with tag or to zero // connect person with tag or to zero
let pos = if tags.len() > 0 { let pos = if tags.len() > 0 {
@ -260,7 +264,7 @@ impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
name, name,
tags, tags,
}); });
return Resp::None; return MessageResult(Resp::None);
} }
} }
} else { } else {
@ -314,7 +318,7 @@ impl Handler<ms::pind::JoinRandomNext> for ChatPinnd {
kaksh_kunjika: group_kunjika, kaksh_kunjika: group_kunjika,
}); });
Resp::Ok MessageResult(Resp::Ok)
} }
} }

View File

@ -31,13 +31,9 @@ extern crate lazy_static;
extern crate anyhow; extern crate anyhow;
use actix_files as fs; use actix_files as fs;
use actix_ratelimit::{MemoryStore, MemoryStoreActor, RateLimiter}; use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web::{
client::{Client, Connector},
middleware::Logger,
web, App, Error, HttpRequest, HttpResponse, HttpServer,
};
use actix_web_actors::ws; use actix_web_actors::ws;
use awc::Client;
use config::CONFIG; use config::CONFIG;
use rustls::{Certificate, PrivateKey, ServerConfig}; use rustls::{Certificate, PrivateKey, ServerConfig};
use std::fs::File; use std::fs::File;
@ -133,9 +129,7 @@ async fn gif(req: HttpRequest) -> Result<HttpResponse, Error> {
pos = "" pos = ""
} }
let client = Client::builder() let client = Client::default();
.connector(Connector::new().finish())
.finish();
let tenor_key = CONFIG.tenor_key.clone().unwrap(); let tenor_key = CONFIG.tenor_key.clone().unwrap();
@ -155,11 +149,13 @@ async fn gif(req: HttpRequest) -> Result<HttpResponse, Error> {
let response = client let response = client
.get(url) .get(url)
.header("User-Agent", "actix-web/3.0") .insert_header(("User-Agent", "actix-web/3.0"))
.send() .send()
.await? .await
.unwrap()
.body() .body()
.await?; .await
.unwrap(); // need handle errors
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
.content_type("application/json") .content_type("application/json")

View File

@ -39,11 +39,6 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout /// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(15); const CLIENT_TIMEOUT: Duration = Duration::from_secs(15);
/// How often heartbeat pings are sent
const SPECIAL_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3 * 60);
/// How long before lack of client response causes a timeout
const SPECIAL_CLIENT_TIMEOUT: Duration = Duration::from_secs(15 * 60);
pub struct WsSansad { pub struct WsSansad {
kunjika: String, kunjika: String,
isthiti: Isthiti, isthiti: Isthiti,
@ -65,13 +60,11 @@ impl Actor for WsSansad {
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
self.addr = Some(ctx.address().clone()); // own addr self.addr = Some(ctx.address().clone()); // own addr
self.hb(ctx); self.hb(ctx);
self.special_hb(ctx);
} }
fn stopping(&mut self, _: &mut Self::Context) -> Running { fn stopping(&mut self, _: &mut Self::Context) -> Running {
tokio::runtime::Runtime::new() futures::executor::block_on(self.leave_kaksh());
.unwrap()
.block_on(self.leave_kaksh()); // notify leaving
Running::Stop Running::Stop
} }
} }
@ -89,9 +82,10 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSansad {
} }
Ok(ws::Message::Text(msg)) => { Ok(ws::Message::Text(msg)) => {
self.special_hb = Instant::now(); self.special_hb = Instant::now();
tokio::runtime::Runtime::new() // tokio::runtime::Runtime::new()
.unwrap() // .unwrap()
.block_on(self.parse_text_handle(msg)); // .block_on(self.parse_text_handle(msg.to_string()));
futures::executor::block_on(self.parse_text_handle(msg.to_string()));
} }
Ok(ws::Message::Close(msg)) => { Ok(ws::Message::Close(msg)) => {
ctx.close(msg); ctx.close(msg);
@ -123,9 +117,7 @@ impl WsSansad {
// heartbeat timed out // heartbeat timed out
// stop actor // stop actor
tokio::runtime::Runtime::new() futures::executor::block_on(act.leave_kaksh());
.unwrap()
.block_on(act.leave_kaksh());
ctx.stop(); ctx.stop();
// don't try to send a ping // don't try to send a ping
return; return;
@ -135,27 +127,6 @@ impl WsSansad {
}); });
} }
/// helper method that sends ping to client every second.
///
/// also this method checks heartbeats from client
fn special_hb(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(SPECIAL_HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.special_hb) > SPECIAL_CLIENT_TIMEOUT {
// heartbeat timed out
// stop actor
tokio::runtime::Runtime::new()
.unwrap()
.block_on(act.leave_kaksh());
ctx.stop();
// don't try to send a ping
return;
}
ctx.ping(b"");
});
}
/// parse the request text from client /// parse the request text from client
async fn parse_text_handle(&mut self, msg: String) { async fn parse_text_handle(&mut self, msg: String) {
if let Ok(val) = serde_json::from_str::<Value>(&msg) { if let Ok(val) = serde_json::from_str::<Value>(&msg) {