extern crate tokio; extern crate futures; //use futures::prelude::*; use futures::sync::mpsc::UnboundedSender; use std::net::SocketAddr; use tokio::prelude::*; use tokio::io; use std::iter; use std::collections::HashMap; use std::io::BufReader; use tokio::net::TcpListener; use std::sync::{Arc, Mutex}; pub mod models; pub mod user; pub mod channel; pub mod context; use context::Context; fn main() { let addr = "127.0.0.1:6667".parse().unwrap(); let listener = TcpListener::bind(&addr) .expect("unable to bind TCP listener"); let mut context = Context::new(); let server = listener.incoming() .map_err(|e| eprintln!("accept failed = {:?}", e)) .for_each(move |sock| { let addr = sock.peer_addr().unwrap(); println!("{} connected", addr); let (reader, writer) = sock.split(); let (tx, rx) = futures::sync::mpsc::unbounded(); { context.connections.lock().unwrap().insert(addr, tx); context.unregistered_users.lock().unwrap().insert(addr, user::User::new(addr)); } let context_inner = context.clone(); let reader = BufReader::new(reader); let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); let socket_reader = iter.fold(reader, move |reader, _| { let mut context = context_inner.clone(); io::read_until(reader, b'\n', vec![]) .and_then(|(reader, vec)| { if vec.len() == 0 { Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) } else { Ok((reader, vec)) } }) .map(|(reader, vec)| { match String::from_utf8(vec) { Ok(s) => (reader, s), Err(e) => { eprintln!("failed converting data: {:?}", e); (reader, "ERROR".to_string()) }, } }) .map(move |(reader, line)| { use models::IrcMessage; let msg = IrcMessage::from_str(&line); let res = match msg { Some(msg) => { use models::IrcCommand::*; match msg.command { NICK => { let mut shouldreg = false; if msg.params.len() < 1 { } else { if context.is_unreg(&addr) { let mut lock = context.unregistered_users.lock().unwrap(); let mut unreg = lock.get_mut(&addr).unwrap(); let wasreg = unreg.is_registered(); unreg.set_nick(&msg.params[0]); if !wasreg && unreg.is_registered() { shouldreg = true; } } } if shouldreg { context.register_user(&addr); } Ok(()) }, USER => { let mut shouldreg = false; if msg.params.len() < 4 { } else { if context.is_unreg(&addr) { let mut lock = context.unregistered_users.lock().unwrap(); let mut unreg = lock.get_mut(&addr).unwrap(); let wasreg = unreg.is_registered(); unreg.set_user(&msg.params[0], &msg.params[2], &msg.params[3]); if !wasreg && unreg.is_registered() { shouldreg = true; } } } if shouldreg { context.register_user(&addr); } Ok(()) }, PING => context.send_message_to(&addr, IrcMessage::new(Some("plankircd"), PONG, vec!["plankircd", &msg.params[0]])), QUIT => Ok(context.disconnect(&addr)), _ => Ok(println!("Unhandled command: {} -> {}", addr, line.trim())), } }, None => Ok(eprintln!("Unrecognized command: {} -> {}", addr, line.trim())), }; if let Err(e) = res { eprintln!("error handling command {:?}", e); } reader }) }) .map_err(|_|()); let socket_writer = rx.fold(writer, move |writer, msg| { println!("{} <- {}", addr, msg); io::write_all(writer, msg.into_bytes()) .map(|(writer, _)| writer) .map_err(|_| ()) }); let connection = socket_reader.map(|_|()).select(socket_writer.map(|_|())); // Spawn the future as a concurrent task. tokio::spawn(connection.then(move |_| { println!("Connection {} closed", addr); Ok(()) })) }); tokio::run(server); }