trillium_http/received_body/
fixed_length.rs1use super::{
2 AsyncRead, AsyncWrite, Context, End, ErrorKind, FixedLength, Ready, ReceivedBody, StateOutput,
3 io, ready,
4};
5
6impl<Transport> ReceivedBody<'_, Transport>
7where
8 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
9{
10 #[inline]
11 pub(super) fn handle_fixed_length(
12 &mut self,
13 cx: &mut Context<'_>,
14 buf: &mut [u8],
15 current_index: u64,
16 total_length: u64,
17 ) -> StateOutput {
18 let len = buf.len();
19 let remaining = usize::try_from(total_length - current_index).unwrap_or(usize::MAX);
20 let buf = &mut buf[..len.min(remaining)];
21 let bytes = ready!(self.read_raw(cx, buf)?);
22 let current_index = current_index + bytes as u64;
23 if current_index == total_length {
24 Ready(Ok((End, bytes)))
25 } else if bytes == 0 {
26 Ready(Err(io::Error::from(ErrorKind::ConnectionAborted)))
27 } else {
28 Ready(Ok((
29 FixedLength {
30 current_index,
31 total: total_length,
32 },
33 bytes,
34 )))
35 }
36 }
37}
38
39#[cfg(test)]
40mod tests {
41 use crate::{Buffer, HttpConfig, ReceivedBody, ReceivedBodyState};
42 use encoding_rs::UTF_8;
43 use futures_lite::{AsyncRead, AsyncReadExt, future::block_on, io::Cursor};
44
45 fn new_with_config(
46 input: String,
47 config: &HttpConfig,
48 ) -> ReceivedBody<'static, Cursor<Vec<u8>>> {
49 ReceivedBody::new_with_config(
50 Some(input.len() as u64),
51 Buffer::with_capacity(100),
52 Cursor::new(input.into_bytes()),
53 ReceivedBodyState::Start,
54 None,
55 UTF_8,
56 config,
57 )
58 }
59
60 fn decode_with_config(
61 input: String,
62 poll_size: usize,
63 config: &HttpConfig,
64 ) -> crate::Result<String> {
65 let mut rb = new_with_config(input, config);
66
67 block_on(read_with_buffers_of_size(&mut rb, poll_size))
68 }
69
70 async fn read_with_buffers_of_size<R>(reader: &mut R, size: usize) -> crate::Result<String>
71 where
72 R: AsyncRead + Unpin,
73 {
74 let mut return_buffer = vec![];
75 loop {
76 let mut buf = vec![0; size];
77 match reader.read(&mut buf).await? {
78 0 => break Ok(String::from_utf8_lossy(&return_buffer).into()),
79 bytes_read => return_buffer.extend_from_slice(&buf[..bytes_read]),
80 }
81 }
82 }
83
84 #[test]
85 fn test() {
86 for size in 3..50 {
87 let input = "12345abcdef";
88 let output = decode_with_config(input.into(), size, &HttpConfig::DEFAULT).unwrap();
89 assert_eq!(output, "12345abcdef", "size: {size}");
90
91 let input = "MozillaDeveloperNetwork";
92 let output = decode_with_config(input.into(), size, &HttpConfig::DEFAULT).unwrap();
93 assert_eq!(output, "MozillaDeveloperNetwork", "size: {size}");
94
95 assert!(decode_with_config(String::new(), size, &HttpConfig::DEFAULT).is_ok());
96
97 let input = "MozillaDeveloperNetwork";
98 assert!(
99 decode_with_config(
100 input.into(),
101 size,
102 &HttpConfig::DEFAULT.with_received_body_max_len(5)
103 )
104 .is_err()
105 );
106 }
107 }
108
109 #[test]
110 fn read_string_and_read_bytes() {
111 block_on(async {
112 let content = "test ".repeat(1000);
113 assert_eq!(
114 new_with_config(content.clone(), &HttpConfig::DEFAULT)
115 .read_string()
116 .await
117 .unwrap()
118 .len(),
119 5000
120 );
121
122 assert_eq!(
123 new_with_config(content.clone(), &HttpConfig::DEFAULT)
124 .read_bytes()
125 .await
126 .unwrap()
127 .len(),
128 5000
129 );
130
131 assert!(
132 new_with_config(
133 content.clone(),
134 &HttpConfig::DEFAULT.with_received_body_max_len(750)
135 )
136 .read_string()
137 .await
138 .is_err()
139 );
140
141 assert!(
142 new_with_config(
143 content.clone(),
144 &HttpConfig::DEFAULT.with_received_body_max_len(750)
145 )
146 .read_bytes()
147 .await
148 .is_err()
149 );
150
151 assert!(
152 new_with_config(content.clone(), &HttpConfig::DEFAULT)
153 .with_max_len(750)
154 .read_bytes()
155 .await
156 .is_err()
157 );
158
159 assert!(
160 new_with_config(content.clone(), &HttpConfig::DEFAULT)
161 .with_max_len(750)
162 .read_string()
163 .await
164 .is_err()
165 );
166 });
167 }
168}