use futures_util::sink::{unfold, Unfold}; use futures_util::{Sink, SinkExt, Stream}; use std::fmt::Display; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::sync::mpsc; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{Receiver, Sender}; pub struct MockStreamSinkControl { pub incoming: Sender, pub outgoing: Receiver, } pub struct MockStreamSink { stream_rx: Receiver, sink_tx: Pin>>, } impl Stream for MockStreamSink where Self: Unpin, { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.stream_rx.poll_recv(cx) } } impl Sink for MockStreamSink where U1: FnMut(u32, R) -> U2, U2: Future>, { type Error = E; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.sink_tx.poll_ready_unpin(cx) } fn start_send(mut self: Pin<&mut Self>, item: R) -> Result<(), Self::Error> { self.sink_tx.start_send_unpin(item) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.sink_tx.poll_flush_unpin(cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.sink_tx.poll_close_unpin(cx) } } pub fn create_mock_stream_sink() -> ( MockStreamSinkControl, impl Stream + Sink, ) { let (stream_tx, stream_rx) = mpsc::channel::(1); let (sink_tx, sink_rx) = mpsc::channel::(1); let sink_tx = Arc::new(sink_tx); ( MockStreamSinkControl { incoming: stream_tx, outgoing: sink_rx, }, MockStreamSink:: { stream_rx, sink_tx: Box::pin(unfold(0u32, move |_, item| { let sink_tx = sink_tx.clone(); async move { sink_tx.send(item).await?; Ok(0u32) as Result<_, SendError> } })), }, ) }