Actor

Actix是一个Rust库,为开发并发应用程序提供了框架。

Actix建立在Actor Model上。允许将应用程序编写为一组独立执行但合作的应用程序 通过消息进行通信的"Actor"。 Actor是封装的对象状态和行为,并在actix库提供的Actor System中运行。

Actor在特定的执行上下文Context中运行,上下文对象仅在执行期间可用。每个Actor都有一个单独的执行上下文。执行上下文还控制actor的生命周期。

Actor通过交换消息进行通信。分派Actor可以可选择等待响应。Actor不是直接引用,而是通过引用地址

任何Rust类型都可以是一个actor,它只需要实现Actor trait。

为了能够处理特定消息actor必须提供的此消息的Handler实现。所有消息是静态类型的。消息可以以异步方式处理。Actor可以生成其他actor或将future/stream添加到执行上下文。Actor trait提供了几种允许控制actor生命周期的方法。

Actor生命周期

Started

actor总是以Started状态开始。在这种状态下,actorstarted()方法被调用。 Actor trait为此方法提供了默认实现。在此状态期间可以使用actor上下文,并且actor可以启动更多actor或注册异步流或执行任何其他所需的配置。

Running

调用Actor的started()方法后,actor转换为Running状态。actor可以一直处于running状态。

Stopping

在以下情况下,Actor的执行状态将更改为stopping状态:

  • Context :: stop由actor本身调用
  • actor的所有地址都被消毁。即没有其他Actor引用它。
  • 在上下文中没有注册事件对象。

一个actor可以通过创建一个新的地址或添加事件对象,并返回Running :: Continue,从而使stopped状态恢复到running状态,。

如果一个actor因为调用了Context :: stop()状态转换为stop,则上下文立即停止处理传入的消息并调用Actor::stopping()。如果actor没有恢复到running状态,那么全部未处理的消息被删除。

默认情况下,此方法返回Running :: Stop,确认停止操作。

Stopped

如果actor在停止状态期间没有修改执行上下文,则actor状态会转换到Stopped。这种状态被认为是最终状态,此时actor被消毁

Message

Actor通过发送消息与其他actor通信。在actix中所有消息是typed。消息可以是实现Message trait的任何rust类型。 Message :: Result定义返回类型。让我们定义一个简单的Ping消息 - 接受此消息的actor需要返回io::Result<bool>

extern crate actix;
use std::io;
use actix::prelude::*;
struct Ping;
impl Message for Ping {
    type Result = Result<bool, io::Error>;
}
fn main() {}

生成actor

如何开始一个actor取决于它的上下文(context)。产生一个新的异步actor是通过实现Actortrait 的startcreate方法。它提供了几种不同的方式创造Actor;有关详细信息,请查看文档。

完整的例子

use std::io;
use actix::prelude::*;
use futures::Future;
/// Define message
struct Ping;
impl Message for Ping {
    type Result = Result<bool, io::Error>;
}
// Define actor
struct MyActor;
// Provide Actor implementation for our actor
impl Actor for MyActor {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Context<Self>) {
       println!("Actor is alive");
    }
    fn stopped(&mut self, ctx: &mut Context<Self>) {
       println!("Actor is stopped");
    }
}
/// Define handler for `Ping` message
impl Handler<Ping> for MyActor {
    type Result = Result<bool, io::Error>;
    fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) -> Self::Result {
        println!("Ping received");
        Ok(true)
    }
}
fn main() {
    let sys = System::new("example");
    // Start MyActor in current thread
    let addr = MyActor.start();
    // Send Ping message.
    // send() message returns Future object, that resolves to message result
    let result = addr.send(Ping);
    // spawn future to reactor
    Arbiter::spawn(
        result.map(|res| {
            match res {
                Ok(result) => println!("Got result: {}", result),
                Err(err) => println!("Got error: {}", err),
            }
        })
        .map_err(|e| {
            println!("Actor is probably died: {}", e);
        }));
    sys.run();
}

使用MessageResponse进行响应

让我们看看上面例子中为impl Handler定义的Result类型。看看我们如何返回Result <bool,io :: Error>?我们能够用这种类型响应我们的actor的传入消息,因为它具有为该类型实现的MessageResponse trait。这是该 trait的定义:

pub trait MessageResponse <A:Actor,M:Message> {
    fn handle <R:ResponseChannel <M >>self,ctx:&mut A :: Context,tx:Option <R>;
}

有时,使用没有为其实现此 trait的类型响应传入消息是有意义的。当发生这种情况时,我们可以自己实现这一 trait。这是一个例子,我们用GotPing回复Ping消息,并用GotPong回复Pong消息。

use actix::dev::{MessageResponse, ResponseChannel};
use actix::prelude::*;
use futures::Future;
enum Messages {
    Ping,
    Pong,
}
enum Responses {
    GotPing,
    GotPong,
}
impl<A, M> MessageResponse<A, M> for Responses
where
    A: Actor,
    M: Message<Result = Responses>,
{
    fn handle<R: ResponseChannel<M>>(self, _: &mut A::Context, tx: Option<R>) {
        if let Some(tx) = tx {
            tx.send(self);
        }
    }
}
impl Message for Messages {
    type Result = Responses;
}
// Define actor
struct MyActor;
// Provide Actor implementation for our actor
impl Actor for MyActor {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Context<Self>) {
        println!("Actor is alive");
    }
    fn stopped(&mut self, ctx: &mut Context<Self>) {
        println!("Actor is stopped");
    }
}
/// Define handler for `Messages` enum
impl Handler<Messages> for MyActor {
    type Result = Responses;
    fn handle(&mut self, msg: Messages, ctx: &mut Context<Self>) -> Self::Result {
        match msg {
            Messages::Ping => Responses::GotPing,
            Messages::Pong => Responses::GotPong,
        }
    }
}
fn main() {
    let sys = System::new("example");
    // Start MyActor in current thread
    let addr = MyActor.start();
    // Send Ping message.
    // send() message returns Future object, that resolves to message result
    let ping_future = addr.send(Messages::Ping);
    let pong_future = addr.send(Messages::Pong);
    // Spawn pong_future onto event loop
    Arbiter::spawn(
        pong_future
            .map(|res| {
                match res {
                    Responses::GotPing => println!("Ping received"),
                    Responses::GotPong => println!("Pong received"),
                }
            })
            .map_err(|e| {
                println!("Actor is probably died: {}", e);
            }),
    );
    // Spawn ping_future onto event loop
    Arbiter::spawn(
        ping_future
            .map(|res| {
                match res {
                    Responses::GotPing => println!("Ping received"),
                    Responses::GotPong => println!("Pong received"),
                }
            })
            .map_err(|e| {
                println!("Actor is probably died: {}", e);
            }),
    );
    sys.run();
}