1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
--- src/publish.rs.orig 2018-10-14 20:00:28 UTC
+++ src/publish.rs
@@ -1,41 +1,38 @@
-use std::io::{self, Write};
use amqp;
-use amqp::protocol::basic::{Deliver, BasicProperties};
+use amqp::protocol::basic::{BasicProperties, Deliver};
+use amqp::Channel;
use clap::ArgMatches;
-use std::fs;
-use std::path::Path;
-use mime;
use client;
use error::RbtError;
+use mime;
use output;
-use amqp::{Channel};
+use std::fs;
+use std::io::{self, Write};
use std::panic;
+use std::path::Path;
-
// helper function to turn a filename
// into a mime-type
-fn type_from_file(file:&String) -> Result<String,RbtError> {
+fn type_from_file(file: &String) -> Result<String, RbtError> {
let t = mime::Types::new().or(Err("Failed to read mime types"))?;
let path = Path::new(&file);
let mime = t.mime_for_path(&path);
Ok(mime.to_owned())
}
-
-pub fn do_publish(opts:amqp::Options, matches:&ArgMatches) -> Result<(),RbtError> {
-
+pub fn do_publish(opts: amqp::Options, matches: &ArgMatches) -> Result<(), RbtError> {
// either stdin or a file
let file = value_t!(matches, "file", String)?;
- let rpc = matches.is_present("rpc");
+ let rpc = matches.is_present("rpc");
let reader: Box<io::Read> = match file.as_ref() {
"-" => Box::new(io::stdin()),
- _ => Box::new(fs::File::open(&file)?),
+ _ => Box::new(fs::File::open(&file)?),
};
// either - or the name of the file
- let file_name:&str = match file.as_ref() {
+ let file_name: &str = match file.as_ref() {
"-" => "-",
- _ => {
+ _ => {
// XXX fix unwrapping
let ostr = Path::new(&file).file_name().unwrap();
ostr.to_str().unwrap()
@@ -48,51 +45,49 @@ pub fn do_publish(opts:amqp::Options, matches:&ArgMatc
let c = matches.value_of("content_type").unwrap_or("-").to_string();
match c.as_ref() {
"-" => type_from_file(&file)?,
- _ => c,
+ _ => c,
}
};
-
// the sendable wraps up the parsed parts
let sendable = client::Sendable {
- exchange: value_t!(matches, "exchange", String)?,
- routing_key: value_t!(matches, "routing_key", String)?,
+ exchange: value_t!(matches, "exchange", String)?,
+ routing_key: value_t!(matches, "routing_key", String)?,
content_type: content_type,
- headers: values_t!(matches, "header", String).unwrap_or(vec![]),
- file_name: file_name.to_owned(),
- reader: reader,
- priority: value_t!(matches, "priority", u8).unwrap_or(0),
- rpctimeout: value_t!(matches, "rpctimeout", u64).unwrap_or(0)
+ headers: values_t!(matches, "header", String).unwrap_or(vec![]),
+ file_name: file_name.to_owned(),
+ reader: reader,
+ priority: value_t!(matches, "priority", u8).unwrap_or(0),
+ rpctimeout: value_t!(matches, "rpctimeout", u64).unwrap_or(0),
};
// if we are doing an rpc with replyTo/correlationId, we set up a receiver
let rpc_receive = match rpc {
false => None,
- true => {
- let receive =
- move |channel: &mut Channel, deliver:Deliver, props:BasicProperties, body:Vec<u8>| ->
- Result<(),RbtError> {
- let msg = output::build_output(false, &deliver, &props, body)?;
+ true => {
+ let receive = move |channel: &mut Channel,
+ deliver: Deliver,
+ props: BasicProperties,
+ body: Vec<u8>|
+ -> Result<(), RbtError> {
+ let msg = output::build_output(false, &deliver, &props, body)?;
- // just write to stdout
- let stdout = io::stdout();
+ // just write to stdout
+ let stdout = io::stdout();
- // lock until end of scope
- let mut handle = stdout.lock();
+ // lock until end of scope
+ let mut handle = stdout.lock();
- handle.write(&msg)?;
- handle.write(b"\n")?;
- handle.flush()?;
-
- // closing the channel
- channel.close(200, "Bye")?;
- panic::set_hook(Box::new(|_| {
- }));
- // Until amqp library finds a way to exit consumer, terminate consumer_thread here.
- panic!();
-
-
- };
+ handle.write(&msg)?;
+ handle.write(b"\n")?;
+ handle.flush()?;
+
+ // closing the channel
+ channel.close(200, "Bye")?;
+ panic::set_hook(Box::new(|_| {}));
+ // Until amqp library finds a way to exit consumer, terminate consumer_thread here.
+ panic!();
+ };
let receiver = client::Receiver {
exchange: "".to_owned(),
|