Ed: solved with the help of the async_stream crate.

I’m struggling with the borrow checker!

My problem: I’m using actix-web and rusqlite. I want to return an unlimited number of records from an rusqlite query, and actix provides a Stream trait for that kind of thing. You just impl the trait and return your records from a poll_next() fn.

On the rusqlite side, there’s this query_map that returns an iterator of records from a query. All I have to do is smush these two features together.

So the plan is to put the iterator returned by query_map into a struct that impls Stream. Problem is the lifetime of a var used by query_map. How to make the var have the same lifetime as the iterator??

So here’s the code:

pub struct ZkNoteStream<'a, T> {
  rec_iter: Box<dyn Iterator<Item = T> + 'a>,
}

// impl of Stream just calls next() on the iterator.  This compiles fine.
impl<'a> Stream for ZkNoteStream<'a, serde_json::Value> {
  type Item = serde_json::Value;

  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    Poll::Ready(self.rec_iter.next())
  }
}

// init function to set up the ZkNoteStream.
impl<'a> ZkNoteStream<'a, Result<ZkListNote, rusqlite::Error>> {
  pub fn init(
    conn: &'a Connection,
    user: i64,
    search: &ZkNoteSearch,
  ) -> Result<Self, Box<dyn Error>> {
    let (sql, args) = build_sql(&conn, user, search.clone())?;

    let sysid = user_id(&conn, "system")?;
    let mut pstmt = conn.prepare(sql.as_str())?;

    // Here's the problem!  Borrowing pstmt.
    let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
      let id = row.get(0)?;
      let sysids = get_sysids(&conn, sysid, id)?;
      Ok(ZkListNote {
        id: id,
        title: row.get(1)?,
        is_file: {
          let wat: Option<i64> = row.get(2)?;
          wat.is_some()
        },
        user: row.get(3)?,
        createdate: row.get(4)?,
        changeddate: row.get(5)?,
        sysids: sysids,
      })
    })?;

    Ok(ZkNoteStream::<Result<ZkListNote, rusqlite::Error>> {
      rec_iter: Box::new(rec_iter),
    })
  }
}

And here’s the error:

error[E0515]: cannot return value referencing local variable `pstmt`
   --> server-lib/src/search.rs:170:5
    |
153 |       let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
    |                      ----- `pstmt` is borrowed here
...
170 | /     Ok(ZkNoteStream::<Result<ZkListNote, rusqlite::Error>> {
171 | |       rec_iter: Box::new(rec_iter),
172 | |     })
    | |______^ returns a value referencing data owned by the current function

So basically it boils down to pstmt getting borrowed in the query_map call. It needs to have the same lifetime as the closure. How do I ensure that?

  • hallettj@beehaw.org
    link
    fedilink
    English
    arrow-up
    1
    ·
    1 year ago

    Did you try adding the block like I suggested? I reproduced the same error that you are seeing, and verified in my playground snippet that the block fixes the problem.

    It’s ok that query_map references pstmt. Isolating pstmt in a block causes it to be dropped at the end of the block while rec_iter continues to live. I think that might be thanks to temporary lifetime extension or something.

    • pr06lefsOP
      link
      fedilink
      arrow-up
      1
      ·
      edit-2
      1 year ago

      Yep, gave that a try (I think!). Here’s that version.

      pub struct ZkNoteStream<'a> {
        rec_iter: Box<dyn Iterator<Item = Bytes> + 'a>,
      }
      
      impl<'a> ZkNoteStream<'a> {
        pub fn init(conn: Connection, user: i64, search: &ZkNoteSearch) -> Result<Self, Box<dyn Error>> {
          let (sql, args) = build_sql(&conn, user, search.clone())?;
      
          let sysid = user_id(&conn, "system")?;
      
          let bytes_iter = {
            let mut pstmt = conn.prepare(sql.as_str())?;
            let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
              let id = row.get(0)?;
              Ok(ZkListNote {
                id: id,
                title: row.get(1)?,
                is_file: {
                  let wat: Option<i64> = row.get(2)?;
                  wat.is_some()
                },
                user: row.get(3)?,
                createdate: row.get(4)?,
                changeddate: row.get(5)?,
                sysids: Vec::new(),
              })
            })?;
      
            let val_iter = rec_iter
              .filter_map(|x| x.ok())
              .map(|x| serde_json::to_value(x).map_err(|e| e.into()));
      
            val_iter
              .filter_map(|x: Result<serde_json::Value, orgauth::error::Error>| x.ok())
              .map(|x| Bytes::from(x.to_string()))
          };
      
          Ok(ZkNoteStream {
            rec_iter: Box::new(bytes_iter),
          })
        }
      }
      
      impl<'a> Stream for ZkNoteStream<'a> {
        type Item = Bytes;
      
        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
          Poll::Ready(self.rec_iter.next())
        }
      }
      

      This gets two errors, one for the conn and one for the pstmt:

      error[E0515]: cannot return value referencing local variable `pstmt`
         --> server-lib/src/search.rs:181:5
          |
      153 |         let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
          |                        ----- `pstmt` is borrowed here
      ...
      181 | /     Ok(ZkNoteStream {
      182 | |       rec_iter: Box::new(bytes_iter),
      183 | |     })
          | |______^ returns a value referencing data owned by the current function
      
      error[E0515]: cannot return value referencing function parameter `conn`
         --> server-lib/src/search.rs:181:5
          |
      152 |         let mut pstmt = conn.prepare(sql.as_str())?;
          |                         ---- `conn` is borrowed here
      ...
      181 | /     Ok(ZkNoteStream {
      182 | |       rec_iter: Box::new(bytes_iter),
      183 | |     })
          | |______^ returns a value referencing data owned by the current function
      
      • hallettj@beehaw.org
        link
        fedilink
        English
        arrow-up
        2
        ·
        1 year ago

        Oh I’m sorry! I messed up my test case so it only looked like the block fixed things.

        • pr06lefsOP
          link
          fedilink
          arrow-up
          2
          ·
          edit-2
          1 year ago

          I’m not quite ready to give up, but its not looking good. One of the rusqlite maintainers issued this haiku-like missive:

          yeah you just have to collect
          you can't return that as an iterator
          it needs to borrow from the statement
          

          Got to wondering how Vec does this interator-with-internal-state thing, and its with unsafe.

          update from the maintainer on discord:

          fundamentally you're asking for a self-referential type. e.g. one field borrows from another field of the same struct. cant  be done without  unsafe
          very easy to have soundness holes even if you use unsafe
          
            • pr06lefsOP
              link
              fedilink
              arrow-up
              2
              ·
              1 year ago

              I’ve been looking into it a bit - not pinning per se but self referential. There’s a library called ouroboros that looks helpful. There’s even an example on github where someone uses rusqlite and ouroboros together.

              So it seems like this should work:

              #[self_referencing]
              pub struct ZkNoteStream {
                conn: Connection,
                #[borrows(conn)]
                pstmt: rusqlite::Statement<'this>,
                #[borrows(mut pstmt)]
                #[covariant]
                rec_iter: rusqlite::Rows<'this>,
              }
              
              impl ZkNoteStream {
                pub fn init(conn: Connection, user: i64, search: &ZkNoteSearch) -> Result<Self, Box<dyn Error>> {
                  let (sql, args) = build_sql(&conn, user, search.clone())?;
              
                  Ok(
                    ZkNoteStreamTryBuilder {
                      conn: conn,
                      pstmt_builder: |conn: &Connection| conn.prepare(sql.as_str()),
                      rec_iter_builder: |pstmt: &mut rusqlite::Statement<'_>| {
                        pstmt.query(rusqlite::params_from_iter(args.iter()))
                      },
                    }
                    .try_build()?,
                  )
                }
              }
              

              Unfortunately I get this:

              error[E0597]: `pstmt` does not live long enough
                 --> server-lib/src/search.rs:880:1
                  |
              880 | #[self_referencing]
                  | ^^^^^^^^^^^^^^^^^^-
                  | |                 |
                  | |                 `pstmt` dropped here while still borrowed
                  | |                 borrow might be used here, when `pstmt` is dropped and runs the `Drop` code for type `Statement`
                  | borrowed value does not live long enough
                  |
                  = note: this error originates in the attribute macro `self_referencing` (in Nightly builds, run with -Z macro-backtrace for more info)
              

              So close! But no cigar so far. No idea why its complaining.

            • pr06lefsOP
              link
              fedilink
              arrow-up
              2
              ·
              edit-2
              1 year ago

              Got this solved over on the rust-lang discourse. The solution was to use the async_stream crate. Ends up being a small amount of code too.