extern crate tokio; extern crate futures; //use futures::prelude::*; use futures::sync::mpsc::UnboundedSender; use std::net::SocketAddr; use tokio::prelude::*; use tokio::io; use std::iter; use std::collections::HashMap; use std::io::BufReader; use tokio::net::TcpListener; use std::sync::{Arc, Mutex}; pub mod models; pub mod user; pub mod channel; pub mod context; use context::Context; fn main() { let addr = "127.0.0.1:6667".parse().unwrap(); let listener = TcpListener::bind(&addr) .expect("unable to bind TCP listener"); let mut context = Context::new(); let server = listener.incoming() .map_err(|e| eprintln!("accept failed = {:?}", e)) .for_each(move |sock| { let addr = sock.peer_addr().unwrap(); println!("{} connected", addr); let (reader, writer) = sock.split(); let (tx, rx) = futures::sync::mpsc::unbounded(); context.connect(addr, tx); let context_inner = context.clone(); let reader = BufReader::new(reader); let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); let socket_reader = iter.fold(reader, move |reader, _| { let mut context = context_inner.clone(); io::read_until(reader, b'\n', vec![]) .and_then(|(reader, vec)| { if vec.len() == 0 { Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) } else { Ok((reader, vec)) } }) .map(|(reader, vec)| { match String::from_utf8(vec) { Ok(s) => (reader, s), Err(e) => { eprintln!("failed converting data: {:?}", e); (reader, "ERROR".to_string()) }, } }) .map(move |(reader, line)| { use models::IrcMessage; for l in line.lines() { println!("{} -> {}", addr, l.trim()); } let res = IrcMessage::from_str(&line); if let Err(e) = res { eprintln!("error parsing command: {:?}", e); return reader; } let msg = res.unwrap(); use models::IrcCommand::*; match msg.command { NICK => { if msg.params.len() < 1 { context.send_message_to(&addr, IrcMessage::new(Some("plankircd"), Num(431), vec!["*", "No nickname given"])); } else { let nick = &msg.params[0]; if context.nick_exists(nick) { context.send_message_to(&addr, IrcMessage::new(Some("plankircd"), Num(443), vec!["*", nick, "Nickname is already in use"])); } else { let (valid, old) = { let mut lock = context.users.lock().unwrap(); let mut user = lock.get_mut(&addr).unwrap(); let old = user.nickname.clone(); user.set_nick(nick); (user.is_valid(), old) }; if valid { if !context.is_registered(&addr) { context.register_user(&addr); } else { context.send_message_to(&addr, IrcMessage::new(Some(&old), NICK, vec![nick])); } } } } }, USER => { if msg.params.len() < 4 { } else { let valid = { let mut lock = context.users.lock().unwrap(); let mut user = lock.get_mut(&addr).unwrap(); user.set_user(&msg.params[0], &msg.params[2], &msg.params[3]); user.is_valid() }; if valid && !context.is_registered(&addr) { context.register_user(&addr); } } }, PING => {context.send_message_to(&addr, IrcMessage::new(Some("plankircd"), PONG, vec!["plankircd", &msg.params[0]]));}, PRIVMSG => { if msg.params[0].starts_with('#') { } else { let users = context.users.lock().unwrap(); let from = &users[&addr]; if let Some(user) = users.values().find(|u| u.nickname == msg.params[0]) { user.privmsg(from, &msg.params[1]); } } } QUIT => context.disconnect(&addr), _ => println!("Unhandled command: {}", msg.command), } reader }) }) .map_err(|_|()); let socket_writer = rx.fold(writer, move |writer, msg| { for l in msg.lines() { println!("{} <- {}", addr, l.trim()); } io::write_all(writer, msg.into_bytes()) .map(|(writer, _)| writer) .map_err(|_| ()) }); let connection = socket_reader.map(|_|()).select(socket_writer.map(|_|())); // Spawn the future as a concurrent task. tokio::spawn(connection.then(move |_| { println!("Connection {} closed", addr); Ok(()) })) }); tokio::run(server); }