Ed: solved with the help of the async_stream crate.

I’m struggling with the borrow checker!

My problem: I’m using actix-web and rusqlite. I want to return an unlimited number of records from an rusqlite query, and actix provides a Stream trait for that kind of thing. You just impl the trait and return your records from a poll_next() fn.

On the rusqlite side, there’s this query_map that returns an iterator of records from a query. All I have to do is smush these two features together.

So the plan is to put the iterator returned by query_map into a struct that impls Stream. Problem is the lifetime of a var used by query_map. How to make the var have the same lifetime as the iterator??

So here’s the code:

pub struct ZkNoteStream<'a, T> {
  rec_iter: Box<dyn Iterator<Item = T> + 'a>,
}

// impl of Stream just calls next() on the iterator.  This compiles fine.
impl<'a> Stream for ZkNoteStream<'a, serde_json::Value> {
  type Item = serde_json::Value;

  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    Poll::Ready(self.rec_iter.next())
  }
}

// init function to set up the ZkNoteStream.
impl<'a> ZkNoteStream<'a, Result<ZkListNote, rusqlite::Error>> {
  pub fn init(
    conn: &'a Connection,
    user: i64,
    search: &ZkNoteSearch,
  ) -> Result<Self, Box<dyn Error>> {
    let (sql, args) = build_sql(&conn, user, search.clone())?;

    let sysid = user_id(&conn, "system")?;
    let mut pstmt = conn.prepare(sql.as_str())?;

    // Here's the problem!  Borrowing pstmt.
    let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
      let id = row.get(0)?;
      let sysids = get_sysids(&conn, sysid, id)?;
      Ok(ZkListNote {
        id: id,
        title: row.get(1)?,
        is_file: {
          let wat: Option<i64> = row.get(2)?;
          wat.is_some()
        },
        user: row.get(3)?,
        createdate: row.get(4)?,
        changeddate: row.get(5)?,
        sysids: sysids,
      })
    })?;

    Ok(ZkNoteStream::<Result<ZkListNote, rusqlite::Error>> {
      rec_iter: Box::new(rec_iter),
    })
  }
}

And here’s the error:

error[E0515]: cannot return value referencing local variable `pstmt`
   --> server-lib/src/search.rs:170:5
    |
153 |       let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
    |                      ----- `pstmt` is borrowed here
...
170 | /     Ok(ZkNoteStream::<Result<ZkListNote, rusqlite::Error>> {
171 | |       rec_iter: Box::new(rec_iter),
172 | |     })
    | |______^ returns a value referencing data owned by the current function

So basically it boils down to pstmt getting borrowed in the query_map call. It needs to have the same lifetime as the closure. How do I ensure that?

  • pr06lefsOP
    link
    fedilink
    arrow-up
    1
    ·
    edit-2
    1 year ago

    Yep, gave that a try (I think!). Here’s that version.

    pub struct ZkNoteStream<'a> {
      rec_iter: Box<dyn Iterator<Item = Bytes> + 'a>,
    }
    
    impl<'a> ZkNoteStream<'a> {
      pub fn init(conn: Connection, user: i64, search: &ZkNoteSearch) -> Result<Self, Box<dyn Error>> {
        let (sql, args) = build_sql(&conn, user, search.clone())?;
    
        let sysid = user_id(&conn, "system")?;
    
        let bytes_iter = {
          let mut pstmt = conn.prepare(sql.as_str())?;
          let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
            let id = row.get(0)?;
            Ok(ZkListNote {
              id: id,
              title: row.get(1)?,
              is_file: {
                let wat: Option<i64> = row.get(2)?;
                wat.is_some()
              },
              user: row.get(3)?,
              createdate: row.get(4)?,
              changeddate: row.get(5)?,
              sysids: Vec::new(),
            })
          })?;
    
          let val_iter = rec_iter
            .filter_map(|x| x.ok())
            .map(|x| serde_json::to_value(x).map_err(|e| e.into()));
    
          val_iter
            .filter_map(|x: Result<serde_json::Value, orgauth::error::Error>| x.ok())
            .map(|x| Bytes::from(x.to_string()))
        };
    
        Ok(ZkNoteStream {
          rec_iter: Box::new(bytes_iter),
        })
      }
    }
    
    impl<'a> Stream for ZkNoteStream<'a> {
      type Item = Bytes;
    
      fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Poll::Ready(self.rec_iter.next())
      }
    }
    

    This gets two errors, one for the conn and one for the pstmt:

    error[E0515]: cannot return value referencing local variable `pstmt`
       --> server-lib/src/search.rs:181:5
        |
    153 |         let rec_iter = pstmt.query_map(rusqlite::params_from_iter(args.iter()), move |row| {
        |                        ----- `pstmt` is borrowed here
    ...
    181 | /     Ok(ZkNoteStream {
    182 | |       rec_iter: Box::new(bytes_iter),
    183 | |     })
        | |______^ returns a value referencing data owned by the current function
    
    error[E0515]: cannot return value referencing function parameter `conn`
       --> server-lib/src/search.rs:181:5
        |
    152 |         let mut pstmt = conn.prepare(sql.as_str())?;
        |                         ---- `conn` is borrowed here
    ...
    181 | /     Ok(ZkNoteStream {
    182 | |       rec_iter: Box::new(bytes_iter),
    183 | |     })
        | |______^ returns a value referencing data owned by the current function
    
    • hallettj@beehaw.org
      link
      fedilink
      English
      arrow-up
      2
      ·
      1 year ago

      Oh I’m sorry! I messed up my test case so it only looked like the block fixed things.

      • pr06lefsOP
        link
        fedilink
        arrow-up
        2
        ·
        edit-2
        1 year ago

        I’m not quite ready to give up, but its not looking good. One of the rusqlite maintainers issued this haiku-like missive:

        yeah you just have to collect
        you can't return that as an iterator
        it needs to borrow from the statement
        

        Got to wondering how Vec does this interator-with-internal-state thing, and its with unsafe.

        update from the maintainer on discord:

        fundamentally you're asking for a self-referential type. e.g. one field borrows from another field of the same struct. cant  be done without  unsafe
        very easy to have soundness holes even if you use unsafe
        
          • pr06lefsOP
            link
            fedilink
            arrow-up
            2
            ·
            1 year ago

            I’ve been looking into it a bit - not pinning per se but self referential. There’s a library called ouroboros that looks helpful. There’s even an example on github where someone uses rusqlite and ouroboros together.

            So it seems like this should work:

            #[self_referencing]
            pub struct ZkNoteStream {
              conn: Connection,
              #[borrows(conn)]
              pstmt: rusqlite::Statement<'this>,
              #[borrows(mut pstmt)]
              #[covariant]
              rec_iter: rusqlite::Rows<'this>,
            }
            
            impl ZkNoteStream {
              pub fn init(conn: Connection, user: i64, search: &ZkNoteSearch) -> Result<Self, Box<dyn Error>> {
                let (sql, args) = build_sql(&conn, user, search.clone())?;
            
                Ok(
                  ZkNoteStreamTryBuilder {
                    conn: conn,
                    pstmt_builder: |conn: &Connection| conn.prepare(sql.as_str()),
                    rec_iter_builder: |pstmt: &mut rusqlite::Statement<'_>| {
                      pstmt.query(rusqlite::params_from_iter(args.iter()))
                    },
                  }
                  .try_build()?,
                )
              }
            }
            

            Unfortunately I get this:

            error[E0597]: `pstmt` does not live long enough
               --> server-lib/src/search.rs:880:1
                |
            880 | #[self_referencing]
                | ^^^^^^^^^^^^^^^^^^-
                | |                 |
                | |                 `pstmt` dropped here while still borrowed
                | |                 borrow might be used here, when `pstmt` is dropped and runs the `Drop` code for type `Statement`
                | borrowed value does not live long enough
                |
                = note: this error originates in the attribute macro `self_referencing` (in Nightly builds, run with -Z macro-backtrace for more info)
            

            So close! But no cigar so far. No idea why its complaining.

          • pr06lefsOP
            link
            fedilink
            arrow-up
            2
            ·
            edit-2
            11 months ago

            Got this solved over on the rust-lang discourse. The solution was to use the async_stream crate. Ends up being a small amount of code too.