Ed: solved with the help of the async_stream crate.
I’m struggling with the borrow checker!
My problem: I’m using actix-web and rusqlite. I want to return an unlimited number of records from an rusqlite query, and actix provides a Stream trait for that kind of thing. You just impl the trait and return your records from a poll_next() fn.
On the rusqlite side, there’s this query_map that returns an iterator of records from a query. All I have to do is smush these two features together.
So the plan is to put the iterator returned by query_map into a struct that impls Stream. Problem is the lifetime of a var used by query_map. How to make the var have the same lifetime as the iterator??
So here’s the code:
pub struct ZkNoteStream<'a, T> {
rec_iter: Box<dyn Iterator<Item = T> + 'a>,
}
// impl of Stream just calls next() on the iterator. This compiles fine.
impl<'a> Stream for ZkNoteStream<'a, serde_json::Value> {
type Item = serde_json::Value;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.rec_iter.next())
}
}
// init function to set up the ZkNoteStream.
impl<'a> ZkNoteStream<'a, Result<ZkListNote, rusqlite::Error>> {
pub fn init(
conn: &'a Connection,
user: i64,
search: &ZkNoteSearch,
) -> Result<Self, Box<dyn Error>> {
let (sql, args) = build_sql(&conn, user, search.clone())?;
let sysid = user_id(&conn, "system")?;
let mut pstmt = conn.prepare(sql.as_str())?;
// Here's the problem! Borrowing pstmt.
let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
let id = row.get(0)?;
let sysids = get_sysids(&conn, sysid, id)?;
Ok(ZkListNote {
id: id,
title: row.get(1)?,
is_file: {
let wat: Option<i64> = row.get(2)?;
wat.is_some()
},
user: row.get(3)?,
createdate: row.get(4)?,
changeddate: row.get(5)?,
sysids: sysids,
})
})?;
Ok(ZkNoteStream::<Result<ZkListNote, rusqlite::Error>> {
rec_iter: Box::new(rec_iter),
})
}
}
And here’s the error:
error[E0515]: cannot return value referencing local variable `pstmt`
--> server-lib/src/search.rs:170:5
|
153 | let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
| ----- `pstmt` is borrowed here
...
170 | / Ok(ZkNoteStream::<Result<ZkListNote, rusqlite::Error>> {
171 | | rec_iter: Box::new(rec_iter),
172 | | })
| |______^ returns a value referencing data owned by the current function
So basically it boils down to pstmt getting borrowed in the query_map call. It needs to have the same lifetime as the closure. How do I ensure that?
Ok I have a simple solution for you. What you need to do is to produce
rec_iter
in such a way thatpstmt
is dropped at the same time. You can do that by creating and callingpstmt
in a block like this:let rec_iter = { let mut pstmt = conn.prepare(sql.as_str())?; pstmt.query_map(/* ... */)? };
Here is a Rust Playground example
So close! The problem is that query_map doesn’t consume pstmt, it only references it.
Error[E0515]: cannot return value referencing local variable `pstmt` --> server-lib/src/search.rs:191:5 | 162 | let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| { | ----- `pstmt` is borrowed here ... 191 | / Ok(ZkNoteStream { 192 | | rec_iter: Box::new(bytes_iter), 193 | | }) | |______^ returns a value referencing data owned by the current function
(bytes_iter is rec_iter run through some map()s)
Did you try adding the block like I suggested? I reproduced the same error that you are seeing, and verified in my playground snippet that the block fixes the problem.
It’s ok that query_map references pstmt. Isolating pstmt in a block causes it to be dropped at the end of the block while rec_iter continues to live. I think that might be thanks to temporary lifetime extension or something.
Yep, gave that a try (I think!). Here’s that version.
pub struct ZkNoteStream<'a> { rec_iter: Box<dyn Iterator<Item = Bytes> + 'a>, } impl<'a> ZkNoteStream<'a> { pub fn init(conn: Connection, user: i64, search: &ZkNoteSearch) -> Result<Self, Box<dyn Error>> { let (sql, args) = build_sql(&conn, user, search.clone())?; let sysid = user_id(&conn, "system")?; let bytes_iter = { let mut pstmt = conn.prepare(sql.as_str())?; let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| { let id = row.get(0)?; Ok(ZkListNote { id: id, title: row.get(1)?, is_file: { let wat: Option<i64> = row.get(2)?; wat.is_some() }, user: row.get(3)?, createdate: row.get(4)?, changeddate: row.get(5)?, sysids: Vec::new(), }) })?; let val_iter = rec_iter .filter_map(|x| x.ok()) .map(|x| serde_json::to_value(x).map_err(|e| e.into())); val_iter .filter_map(|x: Result<serde_json::Value, orgauth::error::Error>| x.ok()) .map(|x| Bytes::from(x.to_string())) }; Ok(ZkNoteStream { rec_iter: Box::new(bytes_iter), }) } } impl<'a> Stream for ZkNoteStream<'a> { type Item = Bytes; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Ready(self.rec_iter.next()) } }
This gets two errors, one for the conn and one for the pstmt:
error[E0515]: cannot return value referencing local variable `pstmt` --> server-lib/src/search.rs:181:5 | 153 | let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| { | ----- `pstmt` is borrowed here ... 181 | / Ok(ZkNoteStream { 182 | | rec_iter: Box::new(bytes_iter), 183 | | }) | |______^ returns a value referencing data owned by the current function error[E0515]: cannot return value referencing function parameter `conn` --> server-lib/src/search.rs:181:5 | 152 | let mut pstmt = conn.prepare(sql.as_str())?; | ---- `conn` is borrowed here ... 181 | / Ok(ZkNoteStream { 182 | | rec_iter: Box::new(bytes_iter), 183 | | }) | |______^ returns a value referencing data owned by the current function
Oh I’m sorry! I messed up my test case so it only looked like the block fixed things.
I’m not quite ready to give up, but its not looking good. One of the rusqlite maintainers issued this haiku-like missive:
yeah you just have to collect you can't return that as an iterator it needs to borrow from the statement
Got to wondering how Vec does this interator-with-internal-state thing, and its with unsafe.
update from the maintainer on discord:
fundamentally you're asking for a self-referential type. e.g. one field borrows from another field of the same struct. cant be done without unsafe very easy to have soundness holes even if you use unsafe
Well if you want to try another avenue, I’ve read about implementing self-referential structs using
Pin
.There’s a discussion here, https://blog.cloudflare.com/pin-and-unpin-in-rust/
There’s a deep dive on pins here, but I don’t remember if it addresses self-referential types: https://fasterthanli.me/articles/pin-and-suffering
I’ve been looking into it a bit - not pinning per se but self referential. There’s a library called ouroboros that looks helpful. There’s even an example on github where someone uses rusqlite and ouroboros together.
So it seems like this should work:
#[self_referencing] pub struct ZkNoteStream { conn: Connection, #[borrows(conn)] pstmt: rusqlite::Statement<'this>, #[borrows(mut pstmt)] #[covariant] rec_iter: rusqlite::Rows<'this>, } impl ZkNoteStream { pub fn init(conn: Connection, user: i64, search: &ZkNoteSearch) -> Result<Self, Box<dyn Error>> { let (sql, args) = build_sql(&conn, user, search.clone())?; Ok( ZkNoteStreamTryBuilder { conn: conn, pstmt_builder: |conn: &Connection| conn.prepare(sql.as_str()), rec_iter_builder: |pstmt: &mut rusqlite::Statement<'_>| { pstmt.query(rusqlite::params_from_iter(args.iter())) }, } .try_build()?, ) } }
Unfortunately I get this:
error[E0597]: `pstmt` does not live long enough --> server-lib/src/search.rs:880:1 | 880 | #[self_referencing] | ^^^^^^^^^^^^^^^^^^- | | | | | `pstmt` dropped here while still borrowed | | borrow might be used here, when `pstmt` is dropped and runs the `Drop` code for type `Statement` | borrowed value does not live long enough | = note: this error originates in the attribute macro `self_referencing` (in Nightly builds, run with -Z macro-backtrace for more info)
So close! But no cigar so far. No idea why its complaining.
Got this solved over on the rust-lang discourse. The solution was to use the async_stream crate. Ends up being a small amount of code too.
Good to hear!