1use std::{
4 future::Future,
5 panic,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll, Waker},
9};
10
11use pyo3_macros::{pyclass, pymethods};
12
13use crate::{
14 coroutine::{cancel::ThrowCallback, waker::AsyncioWaker},
15 exceptions::{PyAttributeError, PyRuntimeError, PyStopIteration},
16 panic::PanicException,
17 types::{string::PyStringMethods, PyIterator, PyString},
18 Bound, IntoPyObject, IntoPyObjectExt, Py, PyAny, PyErr, PyObject, PyResult, Python,
19};
20
21pub(crate) mod cancel;
22mod waker;
23
24pub use cancel::CancelHandle;
25
26const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine";
27
28#[pyclass(crate = "crate")]
30pub struct Coroutine {
31 name: Option<Py<PyString>>,
32 qualname_prefix: Option<&'static str>,
33 throw_callback: Option<ThrowCallback>,
34 future: Option<Pin<Box<dyn Future<Output = PyResult<PyObject>> + Send>>>,
35 waker: Option<Arc<AsyncioWaker>>,
36}
37
38unsafe impl Sync for Coroutine {}
41
42impl Coroutine {
43 pub(crate) fn new<'py, F, T, E>(
50 name: Option<Bound<'py, PyString>>,
51 qualname_prefix: Option<&'static str>,
52 throw_callback: Option<ThrowCallback>,
53 future: F,
54 ) -> Self
55 where
56 F: Future<Output = Result<T, E>> + Send + 'static,
57 T: IntoPyObject<'py>,
58 E: Into<PyErr>,
59 {
60 let wrap = async move {
61 let obj = future.await.map_err(Into::into)?;
62 obj.into_py_any(unsafe { Python::assume_gil_acquired() })
64 };
65 Self {
66 name: name.map(Bound::unbind),
67 qualname_prefix,
68 throw_callback,
69 future: Some(Box::pin(wrap)),
70 waker: None,
71 }
72 }
73
74 fn poll(&mut self, py: Python<'_>, throw: Option<PyObject>) -> PyResult<PyObject> {
75 let future_rs = match self.future {
77 Some(ref mut fut) => fut,
78 None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)),
79 };
80 match (throw, &self.throw_callback) {
82 (Some(exc), Some(cb)) => cb.throw(exc),
83 (Some(exc), None) => {
84 self.close();
85 return Err(PyErr::from_value(exc.into_bound(py)));
86 }
87 (None, _) => {}
88 }
89 if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) {
91 waker.reset();
92 } else {
93 self.waker = Some(Arc::new(AsyncioWaker::new()));
94 }
95 let waker = Waker::from(self.waker.clone().unwrap());
96 let poll = || future_rs.as_mut().poll(&mut Context::from_waker(&waker));
99 match panic::catch_unwind(panic::AssertUnwindSafe(poll)) {
100 Ok(Poll::Ready(res)) => {
101 self.close();
102 return Err(PyStopIteration::new_err((res?,)));
103 }
104 Err(err) => {
105 self.close();
106 return Err(PanicException::from_panic_payload(err));
107 }
108 _ => {}
109 }
110 if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? {
112 if let Some(future) = PyIterator::from_object(future).unwrap().next() {
115 return Ok(future.unwrap().into());
118 }
119 }
120 Ok(py.None())
123 }
124}
125
126#[pymethods(crate = "crate")]
127impl Coroutine {
128 #[getter]
129 fn __name__(&self, py: Python<'_>) -> PyResult<Py<PyString>> {
130 match &self.name {
131 Some(name) => Ok(name.clone_ref(py)),
132 None => Err(PyAttributeError::new_err("__name__")),
133 }
134 }
135
136 #[getter]
137 fn __qualname__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyString>> {
138 match (&self.name, &self.qualname_prefix) {
139 (Some(name), Some(prefix)) => Ok(PyString::new(
140 py,
141 &format!("{}.{}", prefix, name.bind(py).to_cow()?),
142 )),
143 (Some(name), None) => Ok(name.bind(py).clone()),
144 (None, _) => Err(PyAttributeError::new_err("__qualname__")),
145 }
146 }
147
148 fn send(&mut self, py: Python<'_>, _value: &Bound<'_, PyAny>) -> PyResult<PyObject> {
149 self.poll(py, None)
150 }
151
152 fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult<PyObject> {
153 self.poll(py, Some(exc))
154 }
155
156 fn close(&mut self) {
157 drop(self.future.take());
160 }
161
162 fn __await__(self_: Py<Self>) -> Py<Self> {
163 self_
164 }
165
166 fn __next__(&mut self, py: Python<'_>) -> PyResult<PyObject> {
167 self.poll(py, None)
168 }
169}