|
| 1 | +//! DO NOT EDIT - this is a copy of gix-packetline/src/encode/async_io.rs. Run `just copy-packetline` to update it. |
| 2 | +
|
| 3 | +use std::{ |
| 4 | + io, |
| 5 | + pin::Pin, |
| 6 | + task::{Context, Poll}, |
| 7 | +}; |
| 8 | + |
| 9 | +use futures_io::AsyncWrite; |
| 10 | +use futures_lite::AsyncWriteExt; |
| 11 | + |
| 12 | +use super::u16_to_hex; |
| 13 | +use crate::{encode::Error, Channel, DELIMITER_LINE, ERR_PREFIX, FLUSH_LINE, MAX_DATA_LEN, RESPONSE_END_LINE}; |
| 14 | + |
| 15 | +pin_project_lite::pin_project! { |
| 16 | + /// A way of writing packet lines asynchronously. |
| 17 | + pub struct LineWriter<'a, W> { |
| 18 | + #[pin] |
| 19 | + pub(crate) writer: W, |
| 20 | + pub(crate) prefix: &'a [u8], |
| 21 | + pub(crate) suffix: &'a [u8], |
| 22 | + state: State<'a>, |
| 23 | + } |
| 24 | +} |
| 25 | + |
| 26 | +enum State<'a> { |
| 27 | + Idle, |
| 28 | + WriteHexLen([u8; 4], usize), |
| 29 | + WritePrefix(&'a [u8]), |
| 30 | + WriteData(usize), |
| 31 | + WriteSuffix(&'a [u8]), |
| 32 | +} |
| 33 | + |
| 34 | +impl<'a, W: AsyncWrite + Unpin> LineWriter<'a, W> { |
| 35 | + /// Create a new line writer writing data with a `prefix` and `suffix`. |
| 36 | + /// |
| 37 | + /// Keep the additional `prefix` or `suffix` buffers empty if no prefix or suffix should be written. |
| 38 | + pub fn new(writer: W, prefix: &'a [u8], suffix: &'a [u8]) -> Self { |
| 39 | + LineWriter { |
| 40 | + writer, |
| 41 | + prefix, |
| 42 | + suffix, |
| 43 | + state: State::Idle, |
| 44 | + } |
| 45 | + } |
| 46 | + |
| 47 | + /// Consume self and reveal the inner writer. |
| 48 | + pub fn into_inner(self) -> W { |
| 49 | + self.writer |
| 50 | + } |
| 51 | +} |
| 52 | + |
| 53 | +fn into_io_err(err: Error) -> io::Error { |
| 54 | + io::Error::new(io::ErrorKind::Other, err) |
| 55 | +} |
| 56 | + |
| 57 | +impl<W: AsyncWrite + Unpin> AsyncWrite for LineWriter<'_, W> { |
| 58 | + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, data: &[u8]) -> Poll<io::Result<usize>> { |
| 59 | + use futures_lite::ready; |
| 60 | + let mut this = self.project(); |
| 61 | + loop { |
| 62 | + match &mut this.state { |
| 63 | + State::Idle => { |
| 64 | + let data_len = this.prefix.len() + data.len() + this.suffix.len(); |
| 65 | + if data_len > MAX_DATA_LEN { |
| 66 | + return Poll::Ready(Err(into_io_err(Error::DataLengthLimitExceeded { |
| 67 | + length_in_bytes: data_len, |
| 68 | + }))); |
| 69 | + } |
| 70 | + if data.is_empty() { |
| 71 | + return Poll::Ready(Err(into_io_err(Error::DataIsEmpty))); |
| 72 | + } |
| 73 | + let data_len = data_len + 4; |
| 74 | + let len_buf = u16_to_hex(data_len as u16); |
| 75 | + *this.state = State::WriteHexLen(len_buf, 0) |
| 76 | + } |
| 77 | + State::WriteHexLen(hex_len, written) => { |
| 78 | + while *written != hex_len.len() { |
| 79 | + let n = ready!(this.writer.as_mut().poll_write(cx, &hex_len[*written..]))?; |
| 80 | + if n == 0 { |
| 81 | + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); |
| 82 | + } |
| 83 | + *written += n; |
| 84 | + } |
| 85 | + if this.prefix.is_empty() { |
| 86 | + *this.state = State::WriteData(0) |
| 87 | + } else { |
| 88 | + *this.state = State::WritePrefix(this.prefix) |
| 89 | + } |
| 90 | + } |
| 91 | + State::WritePrefix(buf) => { |
| 92 | + while !buf.is_empty() { |
| 93 | + let n = ready!(this.writer.as_mut().poll_write(cx, buf))?; |
| 94 | + if n == 0 { |
| 95 | + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); |
| 96 | + } |
| 97 | + let (_, rest) = std::mem::take(buf).split_at(n); |
| 98 | + *buf = rest; |
| 99 | + } |
| 100 | + *this.state = State::WriteData(0) |
| 101 | + } |
| 102 | + State::WriteData(written) => { |
| 103 | + while *written != data.len() { |
| 104 | + let n = ready!(this.writer.as_mut().poll_write(cx, &data[*written..]))?; |
| 105 | + if n == 0 { |
| 106 | + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); |
| 107 | + } |
| 108 | + *written += n; |
| 109 | + } |
| 110 | + if this.suffix.is_empty() { |
| 111 | + let written = 4 + this.prefix.len() + *written; |
| 112 | + *this.state = State::Idle; |
| 113 | + return Poll::Ready(Ok(written)); |
| 114 | + } else { |
| 115 | + *this.state = State::WriteSuffix(this.suffix) |
| 116 | + } |
| 117 | + } |
| 118 | + State::WriteSuffix(buf) => { |
| 119 | + while !buf.is_empty() { |
| 120 | + let n = ready!(this.writer.as_mut().poll_write(cx, buf))?; |
| 121 | + if n == 0 { |
| 122 | + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); |
| 123 | + } |
| 124 | + let (_, rest) = std::mem::take(buf).split_at(n); |
| 125 | + *buf = rest; |
| 126 | + } |
| 127 | + *this.state = State::Idle; |
| 128 | + return Poll::Ready(Ok(4 + this.prefix.len() + data.len() + this.suffix.len())); |
| 129 | + } |
| 130 | + } |
| 131 | + } |
| 132 | + } |
| 133 | + |
| 134 | + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 135 | + let this = self.project(); |
| 136 | + this.writer.poll_flush(cx) |
| 137 | + } |
| 138 | + |
| 139 | + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 140 | + let this = self.project(); |
| 141 | + this.writer.poll_close(cx) |
| 142 | + } |
| 143 | +} |
| 144 | + |
| 145 | +async fn prefixed_and_suffixed_data_to_write( |
| 146 | + prefix: &[u8], |
| 147 | + data: &[u8], |
| 148 | + suffix: &[u8], |
| 149 | + mut out: impl AsyncWrite + Unpin, |
| 150 | +) -> io::Result<usize> { |
| 151 | + let data_len = prefix.len() + data.len() + suffix.len(); |
| 152 | + if data_len > MAX_DATA_LEN { |
| 153 | + return Err(into_io_err(Error::DataLengthLimitExceeded { |
| 154 | + length_in_bytes: data_len, |
| 155 | + })); |
| 156 | + } |
| 157 | + if data.is_empty() { |
| 158 | + return Err(into_io_err(Error::DataIsEmpty)); |
| 159 | + } |
| 160 | + |
| 161 | + let data_len = data_len + 4; |
| 162 | + let buf = u16_to_hex(data_len as u16); |
| 163 | + |
| 164 | + out.write_all(&buf).await?; |
| 165 | + if !prefix.is_empty() { |
| 166 | + out.write_all(prefix).await?; |
| 167 | + } |
| 168 | + out.write_all(data).await?; |
| 169 | + if !suffix.is_empty() { |
| 170 | + out.write_all(suffix).await?; |
| 171 | + } |
| 172 | + Ok(data_len) |
| 173 | +} |
| 174 | + |
| 175 | +async fn prefixed_data_to_write(prefix: &[u8], data: &[u8], out: impl AsyncWrite + Unpin) -> io::Result<usize> { |
| 176 | + prefixed_and_suffixed_data_to_write(prefix, data, &[], out).await |
| 177 | +} |
| 178 | + |
| 179 | +/// Write a `text` message to `out`, which is assured to end in a newline. |
| 180 | +pub async fn text_to_write(text: &[u8], out: impl AsyncWrite + Unpin) -> io::Result<usize> { |
| 181 | + prefixed_and_suffixed_data_to_write(&[], text, &[b'\n'], out).await |
| 182 | +} |
| 183 | + |
| 184 | +/// Write a `data` message to `out`. |
| 185 | +pub async fn data_to_write(data: &[u8], out: impl AsyncWrite + Unpin) -> io::Result<usize> { |
| 186 | + prefixed_data_to_write(&[], data, out).await |
| 187 | +} |
| 188 | + |
| 189 | +/// Write an error `message` to `out`. |
| 190 | +pub async fn error_to_write(message: &[u8], out: impl AsyncWrite + Unpin) -> io::Result<usize> { |
| 191 | + prefixed_data_to_write(ERR_PREFIX, message, out).await |
| 192 | +} |
| 193 | + |
| 194 | +/// Write a response-end message to `out`. |
| 195 | +pub async fn response_end_to_write(mut out: impl AsyncWrite + Unpin) -> io::Result<usize> { |
| 196 | + out.write_all(RESPONSE_END_LINE).await?; |
| 197 | + Ok(4) |
| 198 | +} |
| 199 | + |
| 200 | +/// Write a delim message to `out`. |
| 201 | +pub async fn delim_to_write(mut out: impl AsyncWrite + Unpin) -> io::Result<usize> { |
| 202 | + out.write_all(DELIMITER_LINE).await?; |
| 203 | + Ok(4) |
| 204 | +} |
| 205 | + |
| 206 | +/// Write a flush message to `out`. |
| 207 | +pub async fn flush_to_write(mut out: impl AsyncWrite + Unpin) -> io::Result<usize> { |
| 208 | + out.write_all(FLUSH_LINE).await?; |
| 209 | + Ok(4) |
| 210 | +} |
| 211 | + |
| 212 | +/// Write `data` of `kind` to `out` using side-band encoding. |
| 213 | +pub async fn band_to_write(kind: Channel, data: &[u8], out: impl AsyncWrite + Unpin) -> io::Result<usize> { |
| 214 | + prefixed_data_to_write(&[kind as u8], data, out).await |
| 215 | +} |
0 commit comments