diff options
Diffstat (limited to 'net/rabbiteer/files/patch-src_client.rs')
-rw-r--r-- | net/rabbiteer/files/patch-src_client.rs | 266 |
1 files changed, 0 insertions, 266 deletions
diff --git a/net/rabbiteer/files/patch-src_client.rs b/net/rabbiteer/files/patch-src_client.rs deleted file mode 100644 index 7951ad581d21..000000000000 --- a/net/rabbiteer/files/patch-src_client.rs +++ /dev/null @@ -1,266 +0,0 @@ ---- src/client.rs.orig 2018-10-14 20:00:28 UTC -+++ src/client.rs -@@ -1,14 +1,14 @@ --use std::io; --use error::RbtError; --use amqp::{self, Session, Options, Channel}; --use amqp::protocol::basic::{Deliver, BasicProperties}; -+use amqp::protocol::basic::{BasicProperties, Deliver}; - use amqp::Basic; -+use amqp::{self, Channel, Options, Session}; - use amqp::{Table, TableEntry}; -+use error::RbtError; -+use std::io; - -+use std::error::Error; -+use std::sync::mpsc; - use std::thread; - use std::time::Duration; --use std::sync::mpsc; --use std::error::Error; - - pub struct Sendable { - pub exchange: String, -@@ -18,21 +18,20 @@ pub struct Sendable { - pub file_name: String, - pub reader: Box<io::Read>, - pub priority: u8, -- pub rpctimeout: u64 -+ pub rpctimeout: u64, - } - --pub type ReceiveCb = FnMut(&mut Channel, Deliver, BasicProperties, Vec<u8>) -> Result<(), RbtError> + Send; -+pub type ReceiveCb = -+ FnMut(&mut Channel, Deliver, BasicProperties, Vec<u8>) -> Result<(), RbtError> + Send; - - pub struct Receiver { -- pub exchange:String, -+ pub exchange: String, - pub routing_key: Option<String>, - pub auto_ack: bool, -- pub callback:Box<ReceiveCb>, -+ pub callback: Box<ReceiveCb>, - } - -- --pub fn open_send(o:Options, s:Sendable, r:Option<Receiver>) -> Result<(),RbtError> { -- -+pub fn open_send(o: Options, s: Sendable, r: Option<Receiver>) -> Result<(), RbtError> { - // open the channel - let (mut session, mut channel) = _open(o)?; - -@@ -49,7 +48,10 @@ pub fn open_send(o:Options, s:Sendable, r:Option<Recei - - // put filename in headers if we read from file - if s.file_name != "-" && !headers.contains_key("fileName") { -- headers.insert("fileName".to_owned(), TableEntry::LongString(String::from(s.file_name))); -+ headers.insert( -+ "fileName".to_owned(), -+ TableEntry::LongString(String::from(s.file_name)), -+ ); - } - - // send properties -@@ -73,8 +75,8 @@ pub fn open_send(o:Options, s:Sendable, r:Option<Recei - props.correlation_id = Some("rabbiteer here".to_owned()); - - true -- }, -- None => false -+ } -+ None => false, - }; - - // read input input buffer -@@ -87,12 +89,14 @@ pub fn open_send(o:Options, s:Sendable, r:Option<Recei - - if isrpc { - let (tx, rx) = mpsc::channel(); -- thread::Builder::new().name("consumer_thread".to_string()).spawn(move || { -- channel.start_consuming(); -- tx.send(channel).unwrap(); -- }).unwrap(); -+ thread::Builder::new() -+ .name("consumer_thread".to_string()) -+ .spawn(move || { -+ channel.start_consuming(); -+ tx.send(channel).unwrap(); -+ }) -+ .unwrap(); - -- - let timeout = s.rpctimeout; - if timeout == 0 { - // Block forever until recieve -@@ -103,7 +107,7 @@ pub fn open_send(o:Options, s:Sendable, r:Option<Recei - match res { - Ok(mut ch) => { - ch.close(200, "Bye")?; -- } -+ } - Err(err) => { - if err.description() == "timed out waiting on channel".to_string() { - println!("Error timeout"); -@@ -121,10 +125,9 @@ pub fn open_send(o:Options, s:Sendable, r:Option<Recei - Ok(()) - } - -- - // narrow the string to a TableEntry type by trying to parse to known - // JSON types: bool, double and fall back on string. --fn narrow(str:&str) -> TableEntry { -+fn narrow(str: &str) -> TableEntry { - let boolv = str.parse::<bool>(); - if !boolv.is_err() { - TableEntry::Bool(boolv.unwrap()) -@@ -138,19 +141,22 @@ fn narrow(str:&str) -> TableEntry { - } - } - -- --fn _open(o:Options) -> Result<(Session, Channel),RbtError> { --// errln!("Connecting to amqp://{}:{}@{}:{}/{}", --// o.login, o.password, o.host, o.port, o.vhost); -+fn _open(o: Options) -> Result<(Session, Channel), RbtError> { -+ // errln!("Connecting to amqp://{}:{}@{}:{}/{}", -+ // o.login, o.password, o.host, o.port, o.vhost); - let mut session = Session::new(o)?; - let channel = session.open_channel(1)?; - Ok((session, channel)) - } - - impl amqp::Consumer for Receiver { -- fn handle_delivery(&mut self, channel:&mut Channel, deliver:Deliver, -- headers:BasicProperties, body:Vec<u8>){ -- -+ fn handle_delivery( -+ &mut self, -+ channel: &mut Channel, -+ deliver: Deliver, -+ headers: BasicProperties, -+ body: Vec<u8>, -+ ) { - let delivery_tag = deliver.delivery_tag.clone(); - - if self.auto_ack { -@@ -160,12 +166,15 @@ impl amqp::Consumer for Receiver { - - // and deliver to callback - ((self.callback)(channel, deliver, headers, body)).unwrap_or_else(::error::handle); -- - } - } - --pub fn open_receive(o:Options, q:Option<String>, force_declare: bool, r:Receiver) -> Result<(),RbtError> { -- -+pub fn open_receive( -+ o: Options, -+ q: Option<String>, -+ force_declare: bool, -+ r: Receiver, -+) -> Result<(), RbtError> { - // open session/channel - let (_, mut channel) = _open(o)?; - -@@ -178,22 +187,33 @@ pub fn open_receive(o:Options, q:Option<String>, force - Ok(()) - } - -- --fn do_open_receive(channel:&mut Channel, q:Option<String>, force_declare: bool, r:Receiver) -> Result<String,RbtError> { -- -+fn do_open_receive( -+ channel: &mut Channel, -+ q: Option<String>, -+ force_declare: bool, -+ r: Receiver, -+) -> Result<String, RbtError> { - let mut auto_delete = false; - let mut bind_routing_key = r.routing_key.clone(); -- -+ - let queue_name = match q { - Some(q) => { - // Force the declaration of this queue - if force_declare { - // queue, passive, durable, exclusive, auto_delete, nowait, arguments -- let queue_declare = channel.queue_declare(q, false, false, auto_delete, auto_delete, false, Table::new())?; -+ let queue_declare = channel.queue_declare( -+ q, -+ false, -+ false, -+ auto_delete, -+ auto_delete, -+ false, -+ Table::new(), -+ )?; - - // name is auto generated - queue_declare.queue -- }else{ -+ } else { - q - } - } -@@ -205,25 +225,33 @@ fn do_open_receive(channel:&mut Channel, q:Option<Stri - } - - // queue, passive, durable, exclusive, auto_delete, nowait, arguments -- let queue_declare = -- channel.queue_declare( -- q.clone().unwrap_or("".to_owned()) , -- false, false, -- auto_delete, auto_delete, false, Table::new())?; -+ let queue_declare = channel.queue_declare( -+ q.clone().unwrap_or("".to_owned()), -+ false, -+ false, -+ auto_delete, -+ auto_delete, -+ false, -+ Table::new(), -+ )?; - - // name is auto generated - queue_declare.queue -- - } - }; - - // Only bind if we have a routing key - May be an existing queue - if let Some(routing_key) = bind_routing_key { - // bind queue to the exchange, which already must be declared. -- -+ - if r.exchange != "" { -- channel.queue_bind(queue_name.clone(), r.exchange.clone(), routing_key.clone(), -- false, Table::new())?; -+ channel.queue_bind( -+ queue_name.clone(), -+ r.exchange.clone(), -+ routing_key.clone(), -+ false, -+ Table::new(), -+ )?; - } - } - -@@ -232,8 +260,16 @@ fn do_open_receive(channel:&mut Channel, q:Option<Stri - - // start consuming the queue. - // callback, queue, consumer_tag, no_local, no_ack, exclusive, nowait, arguments -- channel.basic_consume(r, queue_name.clone(), consumer_tag, false, -- false, false, false, Table::new())?; -+ channel.basic_consume( -+ r, -+ queue_name.clone(), -+ consumer_tag, -+ false, -+ false, -+ false, -+ false, -+ Table::new(), -+ )?; - - Ok(queue_name) - } |