Ed: solved with the help of the async_stream crate.

I’m struggling with the borrow checker!

My problem: I’m using actix-web and rusqlite. I want to return an unlimited number of records from an rusqlite query, and actix provides a Stream trait for that kind of thing. You just impl the trait and return your records from a poll_next() fn.

On the rusqlite side, there’s this query_map that returns an iterator of records from a query. All I have to do is smush these two features together.

So the plan is to put the iterator returned by query_map into a struct that impls Stream. Problem is the lifetime of a var used by query_map. How to make the var have the same lifetime as the iterator??

So here’s the code:

pub struct ZkNoteStream<'a, T> {
  rec_iter: Box<dyn Iterator<Item = T> + 'a>,
}

// impl of Stream just calls next() on the iterator.  This compiles fine.
impl<'a> Stream for ZkNoteStream<'a, serde_json::Value> {
  type Item = serde_json::Value;

  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    Poll::Ready(self.rec_iter.next())
  }
}

// init function to set up the ZkNoteStream.
impl<'a> ZkNoteStream<'a, Result<ZkListNote, rusqlite::Error>> {
  pub fn init(
    conn: &'a Connection,
    user: i64,
    search: &ZkNoteSearch,
  ) -> Result<Self, Box<dyn Error>> {
    let (sql, args) = build_sql(&conn, user, search.clone())?;

    let sysid = user_id(&conn, "system")?;
    let mut pstmt = conn.prepare(sql.as_str())?;

    // Here's the problem!  Borrowing pstmt.
    let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
      let id = row.get(0)?;
      let sysids = get_sysids(&conn, sysid, id)?;
      Ok(ZkListNote {
        id: id,
        title: row.get(1)?,
        is_file: {
          let wat: Option<i64> = row.get(2)?;
          wat.is_some()
        },
        user: row.get(3)?,
        createdate: row.get(4)?,
        changeddate: row.get(5)?,
        sysids: sysids,
      })
    })?;

    Ok(ZkNoteStream::<Result<ZkListNote, rusqlite::Error>> {
      rec_iter: Box::new(rec_iter),
    })
  }
}

And here’s the error:

error[E0515]: cannot return value referencing local variable `pstmt`
   --> server-lib/src/search.rs:170:5
    |
153 |       let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
    |                      ----- `pstmt` is borrowed here
...
170 | /     Ok(ZkNoteStream::<Result<ZkListNote, rusqlite::Error>> {
171 | |       rec_iter: Box::new(rec_iter),
172 | |     })
    | |______^ returns a value referencing data owned by the current function

So basically it boils down to pstmt getting borrowed in the query_map call. It needs to have the same lifetime as the closure. How do I ensure that?

  • hallettj@beehaw.org
    link
    fedilink
    English
    arrow-up
    1
    ·
    1 year ago

    Ok I have a simple solution for you. What you need to do is to produce rec_iter in such a way that pstmt is dropped at the same time. You can do that by creating and calling pstmt in a block like this:

    let rec_iter = {
        let mut pstmt = conn.prepare(sql.as_str())?;
        pstmt.query_map(/* ... */)?
    };
    

    Here is a Rust Playground example

    • pr06lefsOP
      link
      fedilink
      arrow-up
      1
      ·
      edit-2
      1 year ago

      So close! The problem is that query_map doesn’t consume pstmt, it only references it.

      Error[E0515]: cannot return value referencing local variable `pstmt`
         --> server-lib/src/search.rs:191:5
          |
      162 |         let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
          |                        ----- `pstmt` is borrowed here
      ...
      191 | /     Ok(ZkNoteStream {
      192 | |       rec_iter: Box::new(bytes_iter),
      193 | |     })
          | |______^ returns a value referencing data owned by the current function
      

      (bytes_iter is rec_iter run through some map()s)

      • hallettj@beehaw.org
        link
        fedilink
        English
        arrow-up
        1
        ·
        1 year ago

        Did you try adding the block like I suggested? I reproduced the same error that you are seeing, and verified in my playground snippet that the block fixes the problem.

        It’s ok that query_map references pstmt. Isolating pstmt in a block causes it to be dropped at the end of the block while rec_iter continues to live. I think that might be thanks to temporary lifetime extension or something.

        • pr06lefsOP
          link
          fedilink
          arrow-up
          1
          ·
          edit-2
          1 year ago

          Yep, gave that a try (I think!). Here’s that version.

          pub struct ZkNoteStream<'a> {
            rec_iter: Box<dyn Iterator<Item = Bytes> + 'a>,
          }
          
          impl<'a> ZkNoteStream<'a> {
            pub fn init(conn: Connection, user: i64, search: &ZkNoteSearch) -> Result<Self, Box<dyn Error>> {
              let (sql, args) = build_sql(&conn, user, search.clone())?;
          
              let sysid = user_id(&conn, "system")?;
          
              let bytes_iter = {
                let mut pstmt = conn.prepare(sql.as_str())?;
                let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
                  let id = row.get(0)?;
                  Ok(ZkListNote {
                    id: id,
                    title: row.get(1)?,
                    is_file: {
                      let wat: Option<i64> = row.get(2)?;
                      wat.is_some()
                    },
                    user: row.get(3)?,
                    createdate: row.get(4)?,
                    changeddate: row.get(5)?,
                    sysids: Vec::new(),
                  })
                })?;
          
                let val_iter = rec_iter
                  .filter_map(|x| x.ok())
                  .map(|x| serde_json::to_value(x).map_err(|e| e.into()));
          
                val_iter
                  .filter_map(|x: Result<serde_json::Value, orgauth::error::Error>| x.ok())
                  .map(|x| Bytes::from(x.to_string()))
              };
          
              Ok(ZkNoteStream {
                rec_iter: Box::new(bytes_iter),
              })
            }
          }
          
          impl<'a> Stream for ZkNoteStream<'a> {
            type Item = Bytes;
          
            fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
              Poll::Ready(self.rec_iter.next())
            }
          }
          

          This gets two errors, one for the conn and one for the pstmt:

          error[E0515]: cannot return value referencing local variable `pstmt`
             --> server-lib/src/search.rs:181:5
              |
          153 |         let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
              |                        ----- `pstmt` is borrowed here
          ...
          181 | /     Ok(ZkNoteStream {
          182 | |       rec_iter: Box::new(bytes_iter),
          183 | |     })
              | |______^ returns a value referencing data owned by the current function
          
          error[E0515]: cannot return value referencing function parameter `conn`
             --> server-lib/src/search.rs:181:5
              |
          152 |         let mut pstmt = conn.prepare(sql.as_str())?;
              |                         ---- `conn` is borrowed here
          ...
          181 | /     Ok(ZkNoteStream {
          182 | |       rec_iter: Box::new(bytes_iter),
          183 | |     })
              | |______^ returns a value referencing data owned by the current function
          
          • hallettj@beehaw.org
            link
            fedilink
            English
            arrow-up
            2
            ·
            1 year ago

            Oh I’m sorry! I messed up my test case so it only looked like the block fixed things.

            • pr06lefsOP
              link
              fedilink
              arrow-up
              2
              ·
              edit-2
              1 year ago

              I’m not quite ready to give up, but its not looking good. One of the rusqlite maintainers issued this haiku-like missive:

              yeah you just have to collect
              you can't return that as an iterator
              it needs to borrow from the statement
              

              Got to wondering how Vec does this interator-with-internal-state thing, and its with unsafe.

              update from the maintainer on discord:

              fundamentally you're asking for a self-referential type. e.g. one field borrows from another field of the same struct. cant  be done without  unsafe
              very easy to have soundness holes even if you use unsafe
              
                • pr06lefsOP
                  link
                  fedilink
                  arrow-up
                  2
                  ·
                  1 year ago

                  I’ve been looking into it a bit - not pinning per se but self referential. There’s a library called ouroboros that looks helpful. There’s even an example on github where someone uses rusqlite and ouroboros together.

                  So it seems like this should work:

                  #[self_referencing]
                  pub struct ZkNoteStream {
                    conn: Connection,
                    #[borrows(conn)]
                    pstmt: rusqlite::Statement<'this>,
                    #[borrows(mut pstmt)]
                    #[covariant]
                    rec_iter: rusqlite::Rows<'this>,
                  }
                  
                  impl ZkNoteStream {
                    pub fn init(conn: Connection, user: i64, search: &ZkNoteSearch) -> Result<Self, Box<dyn Error>> {
                      let (sql, args) = build_sql(&conn, user, search.clone())?;
                  
                      Ok(
                        ZkNoteStreamTryBuilder {
                          conn: conn,
                          pstmt_builder: |conn: &Connection| conn.prepare(sql.as_str()),
                          rec_iter_builder: |pstmt: &mut rusqlite::Statement<'_>| {
                            pstmt.query(rusqlite::params_from_iter(args.iter()))
                          },
                        }
                        .try_build()?,
                      )
                    }
                  }
                  

                  Unfortunately I get this:

                  error[E0597]: `pstmt` does not live long enough
                     --> server-lib/src/search.rs:880:1
                      |
                  880 | #[self_referencing]
                      | ^^^^^^^^^^^^^^^^^^-
                      | |                 |
                      | |                 `pstmt` dropped here while still borrowed
                      | |                 borrow might be used here, when `pstmt` is dropped and runs the `Drop` code for type `Statement`
                      | borrowed value does not live long enough
                      |
                      = note: this error originates in the attribute macro `self_referencing` (in Nightly builds, run with -Z macro-backtrace for more info)
                  

                  So close! But no cigar so far. No idea why its complaining.

                • pr06lefsOP
                  link
                  fedilink
                  arrow-up
                  2
                  ·
                  edit-2
                  1 year ago

                  Got this solved over on the rust-lang discourse. The solution was to use the async_stream crate. Ends up being a small amount of code too.