Welcome to WuJiGu Developer Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
313 views
in Technique[技术] by (71.8m points)

asynchronous - Temporarily suspend or deactivate an interval stream used in futures::select

I'm currently writing an application that is heavily based on repeating tasks at well-defined frequencies. As such, I chose to set up my main loop something like this:

use async_std::stram::interval;
use futures::{pin_mut, select, stream::StreamExt};

let interval_a = interval(duration_a).fuse();
let interval_b = interval(duration_b).fuse();
pin_mut!(interval_a, interval_b);

loop {
  select! {
    _ = interval_a.next() => {
      // perform task A
    }
    _ = interval_b.next() => {
      // perform task B
    }
    complete => break,
    default => std::thread::sleep(some_time), // though due to the async select! this may not yield much benefit
  }
}

The problem now is that I have one task that needs to be executed at a high frequency (let's say task B), but only if a certain condition is met (which is determined in another interval, e.g. task A).

A straightforward way is of course to include an if condition in task B to simply continue; but I was wondering if there's a more elegant way to suspend interval_b instead, so that it won't get select!ed unless it's relevant.

My first impulse was to initialize the respective stream with let mut interval_b = futures::stream::pending().fuse(), then if the relevant condition in task A is met, I'd assign the proper interval_b = interval(duration_b), or reset it to a Pending if the condition is not met any more after a while. However, this leads to typing issues, as a Fuse<Pending<Interval>> is not a Fuse<Interval>, so one can not suddenly assign a value of the latter to a variable of the former.

I was wondering if some dyn-based type declaration was possible, i.e.

let mut interval_b: 
  Box<dyn mut futures::stream::FusedStream<Item = async_std::stream::Interval>>>
  = Box::new(futures::stream::pending().fuse())`

Yet while I can declare interval_b this way, I cannot figure out how to assign a boxed fused interval to it and run into issues with the pinning of the boxed value in the select! as well.

So I've been wondering, is what I want to achieve even possible in Rust, and if so, what's the issue with my present approaches.

question from:https://stackoverflow.com/questions/65903440/temporarily-suspend-or-deactivate-an-interval-stream-used-in-futuresselect

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to WuJiGu Developer Q&A Community for programmer and developer-Open, Learning and Share
...