Skip to content

Commit

Permalink
add message types & restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
lsampras committed May 22, 2021
1 parent 8ef9c2c commit f252a4d
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 131 deletions.
22 changes: 21 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,29 @@ mod torrent_meta;
mod tracker;
mod utils;
mod peer;
// step 1 parse torrent files
mod network;


pub fn test() {
let test: Vec<u8> = vec![20,1,2,3,4,5,6,7];
let mut bitfield = Vec::<bool>::new();
println!("{:?}", test.into_iter().map(|data_byte| {
vec![
data_byte & (1 << 0) != 0,
data_byte & (1 << 1) != 0,
data_byte & (1 << 2) != 0,
data_byte & (1 << 3) != 0,
data_byte & (1 << 4) != 0,
data_byte & (1 << 5) != 0,
data_byte & (1 << 6) != 0,
data_byte & (1 << 7) != 0,
]
}).flatten().collect::<Vec<bool>>());
panic!();
}

pub fn main() {
test();
let args: Vec<String> = env::args().collect();
let filename = &args[1];
let metadata = torrent_meta::TorrentMetadata::from_file(filename);
Expand Down
45 changes: 45 additions & 0 deletions src/network/message_parser.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::utils::{parse_big_endian, convert_u8_to_bits};

pub enum Message {
Choke,
UnChoke,
Interested,
NotInterested,
Have(u32),
Bitfield(Vec<bool>),
Request(u32, u32, u32),
Piece(u32, u32, Vec<u8>),
Cancel(u32),
}

impl Message {
pub fn from_bytes(data : Vec<u8>) -> Message {
let (msg_id, msg_payload) = data.split_at(1);

match msg_id {
[0] => Message::Choke,
[1] => Message::UnChoke,
[2] => Message::Interested,
[3] => Message::NotInterested,
[4] => Message::Have(parse_big_endian(&msg_payload[0..4])),
[5] => {
Message::Bitfield(msg_payload.into_iter()
.map(|data_byte| convert_u8_to_bits(data_byte))
.flatten().collect::<Vec<bool>>())
},
[6] => Message::Request(
parse_big_endian(&msg_payload[0..4]),
parse_big_endian(&msg_payload[4..8]),
parse_big_endian(&msg_payload[8..12]),
),
[7] => Message::Piece(
parse_big_endian(&msg_payload[0..4]),
parse_big_endian(&msg_payload[4..8]),
msg_payload[8..].to_owned()
),
[8] => Message::Cancel(parse_big_endian(&msg_payload[0..4])),
_ => panic!("Unsupported msg_type {:?} with data {:?}", msg_id, data)
}
}
}

2 changes: 2 additions & 0 deletions src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod message_parser;
pub mod peer_connection;
83 changes: 83 additions & 0 deletions src/network/peer_connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::net::{IpAddr, Ipv4Addr};
use std::net::{TcpStream, SocketAddr};
use std::fmt::Formatter;
use std::io::{Read, Write, Error, ErrorKind};
use std::time::Duration;
use crate::torrent_meta::TorrentMetadata;
use crate::utils::{PEER_ID, PROTOCOL, parse_big_endian};
use crate::network::message_parser::Message;
use crate::peer::Peer;

pub struct PeerConnection {
peer: Peer,
stream: TcpStream,
torrent: TorrentMetadata
}

impl PeerConnection {

pub fn new(peer: Peer, torrent_meta:TorrentMetadata) -> Result<PeerConnection, Error> {
println!("Connecting to {}...", &peer);
let addr = SocketAddr::new(peer.ip, peer.port);
match TcpStream::connect_timeout(&addr, Duration::new(10,0)) {
Ok(stream_obj) => {
println!("Connected successfully to {}", &peer);

Ok(PeerConnection {
peer: peer,
stream: stream_obj,
torrent: torrent_meta
})
}
Err(e) => panic!("Failed to create a PeerConnection : {}", e)
}
}

pub fn handshake(&mut self) {

let mut message = vec![];
message.push(PROTOCOL.len() as u8);
message.extend(PROTOCOL.bytes());
message.extend(vec![0;8].into_iter());
message.extend(self.torrent.info_hash.iter().cloned());
message.extend(PEER_ID.bytes());
self.stream.write_all(&message).unwrap();

let pstrlen = self.read(1).unwrap();
let _pstr = self.read(pstrlen[0] as u32).unwrap();
let _reserved = self.read(8).unwrap();
let _info_hash = self.read(20).unwrap();
let _peer_id = self.read(20).unwrap();
println!("Received handshake");
}

pub fn fetch_data(&mut self) {

match self.read(4) {
Ok(length) => {
let payload = self.read(parse_big_endian(&length.as_slice()));
println!("Length {:?} Payload: {:?}", length, payload);
}
Err(_) => println!("No data received for {}", &self.peer)
}
}

fn read(&mut self, bytes_to_read: u32) -> Result<Vec<u8>, Error> {
let mut buf = vec![];
let stream_ref = &mut self.stream;
let mut take = stream_ref.take(bytes_to_read as u64);
let bytes_read = take.read_to_end(&mut buf);
match bytes_read {
Ok(n) => {
if (n as u32) == bytes_to_read {
Ok(buf)
} else {
Err(Error::new(ErrorKind::Other, "No data received"))
}
}
Err(e) => {
Err(e)
}
}
}
}
77 changes: 2 additions & 75 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::io::{Read, Write, Error, ErrorKind};
use std::time::Duration;
use crate::torrent_meta::TorrentMetadata;
use crate::utils::{PEER_ID, PROTOCOL, parse_big_endian};
use crate::network::message_parser::Message;
use crate::network::peer_connection::PeerConnection;

#[derive(Debug, Clone, PartialEq, Copy)]
pub struct Peer {
Expand Down Expand Up @@ -42,78 +44,3 @@ impl Peer {
}

}


struct PeerConnection {
peer: Peer,
stream: TcpStream,
torrent: TorrentMetadata
}

impl PeerConnection {

pub fn new(peer: Peer, torrent_meta:TorrentMetadata) -> Result<PeerConnection, Error> {
println!("Connecting to {}...", &peer);
let addr = SocketAddr::new(peer.ip, peer.port);
match TcpStream::connect_timeout(&addr, Duration::new(10,0)) {
Ok(stream_obj) => {
println!("Connected successfully to {}", &peer);

Ok(PeerConnection {
peer: peer,
stream: stream_obj,
torrent: torrent_meta
})
}
Err(e) => panic!("Failed to create a PeerConnection : {}", e)
}
}

fn handshake(&mut self) {

let mut message = vec![];
message.push(PROTOCOL.len() as u8);
message.extend(PROTOCOL.bytes());
message.extend(vec![0;8].into_iter());
message.extend(self.torrent.info_hash.iter().cloned());
message.extend(PEER_ID.bytes());
self.stream.write_all(&message).unwrap();

let pstrlen = self.read(1).unwrap();
let _pstr = self.read(pstrlen[0] as u32).unwrap();
let _reserved = self.read(8).unwrap();
let _info_hash = self.read(20).unwrap();
let _peer_id = self.read(20).unwrap();
println!("Received handshake");
}

fn fetch_data(&mut self) {

match self.read(4) {
Ok(length) => {
let payload = self.read(parse_big_endian(&length.as_slice()));
println!("Length {:?} Payload: {:?}", length, payload);
}
Err(_) => println!("No data received for {}", &self.peer)
}
}

fn read(&mut self, bytes_to_read: u32) -> Result<Vec<u8>, Error> {
let mut buf = vec![];
let stream_ref = &mut self.stream;
let mut take = stream_ref.take(bytes_to_read as u64);
let bytes_read = take.read_to_end(&mut buf);
match bytes_read {
Ok(n) => {
if (n as u32) == bytes_to_read {
Ok(buf)
} else {
Err(Error::new(ErrorKind::Other, "No data received"))
}
}
Err(e) => {
Err(e)
}
}
}
}
1 change: 1 addition & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod piece;
Empty file added src/storage/piece.rs
Empty file.
19 changes: 19 additions & 0 deletions src/torrent_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

use crate::peer::Peer;
use crate::torrent_meta::TorrentMetadata;
use crate::tracker::TrackerResponse;
#[derive(Debug)]
pub struct PeerState {
pub peer_id: u8
pub peer: Peer
pub mut have: Vec<bool>,
pub mut choked: bool,
pub mut interested: bool,
}

#[derive(Debug)]
pub struct TorrentState {
torrent_meta: TorrentMetadata,
tracker_info: TrackerResponse
}

67 changes: 12 additions & 55 deletions src/torrent_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct Info {
#[serde(default)]
#[serde(rename="root hash")]
root_hash: Option<String>,
pub pieces_hash: Option<Vec<Vec<u8>>>
}

#[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -69,60 +70,6 @@ pub struct TorrentMetadata {
}


// #[derive(Debug, Deserialize, Serialize)]
// struct File {
// path: Vec<String>,
// length: i64,
// #[serde(default)]
// md5sum: Option<String>,
// }

// #[derive(Debug, Default, Deserialize, Serialize)]
// pub struct Info {
// name: String,
// pieces: ByteBuf,
// #[serde(rename="piece length")]
// piece_length: i64,
// #[serde(default)]
// md5sum: Option<String>,
// #[serde(default)]
// pub length: Option<i64>,
// #[serde(default)]
// files: Option<Vec<File>>,
// #[serde(default)]
// private: Option<u8>,
// #[serde(default)]
// path: Option<Vec<String>>,
// #[serde(default)]
// #[serde(rename="root hash")]
// root_hash: Option<String>,
// }

// // the Deserialize derive is provided by serde, it causes the code
// // to be generated that is needed for deserialization
// #[derive(Default, Debug, Deserialize)]
// // this tells serde you want to use kebab-case, which is
// // with dashes instead of underscores for field names
// #[serde(rename_all = "kebab-case")]
// // this tells serde to default values to the empty case if not provided
// #[serde(default)]
// pub struct Torrent {
// pub info: Info,
// pub announce: String,
// // nodes: Option<Vec<Node>>,
// encoding: Option<String>,
// httpseeds: Vec<String>,
// #[serde(rename="announce-list")]
// announce_list: Vec<Vec<String>>,
// #[serde(rename="creation date")]
// creation_date: i64,
// #[serde(rename="comment")]
// comment: String,
// #[serde(rename="created by")]
// created_by: String,
// pub info_hash: Vec<u8>,
// }

impl TorrentMetadata {

pub fn from_file(filename: &String) -> TorrentMetadata {
Expand All @@ -131,6 +78,7 @@ impl TorrentMetadata {
f.read_to_end(&mut s).unwrap();
let mut decoded: TorrentMetadata = serde_bencode::from_bytes(&s).unwrap();
decoded.info_hash = create_info_hash(&decoded.info);
decoded.info.parse_pieces_hash();
decoded
}

Expand All @@ -150,6 +98,7 @@ impl TorrentMetadata {
println!("created by:\t{:?}", self.created_by);
println!("encoding:\t{:?}", self.encoding);
println!("piece length:\t{:?}", self.info.piece_length);
println!("pieces:\t{:?}", self.info.pieces_hash);
println!("private:\t{:?}", self.info.private);
println!("root hash:\t{:?}", self.info.root_hash);
println!("md5sum:\t\t{:?}", self.info.md5sum);
Expand All @@ -164,6 +113,14 @@ impl TorrentMetadata {
}
}

impl Info {
pub fn parse_pieces_hash(&mut self) {
// let pieces = &mut self.pieces;
let hash_pieces = self.pieces.as_slice();
self.pieces_hash = Some(hash_pieces.chunks_exact(20).map(|hash| hash.to_owned()).collect());
}
}


fn create_info_hash(info: &Info) -> Vec<u8> {
let info_raw = serde_bencode::to_bytes(info).unwrap();
Expand All @@ -172,4 +129,4 @@ fn create_info_hash(info: &Info) -> Vec<u8> {
let mut hex: Vec<u8> = vec![0; 20];
hasher.result(&mut hex);
hex
}
}
Loading

0 comments on commit f252a4d

Please sign in to comment.