actix
为什么能够通过addres 来send msg
contextimpl 中的poll中执行力self.mailbox.poll(self,act) 其中一直在读msg然后执行msg的handle方法 实际上就接收到了Msg 每个addres 通过Arc共享一份内存一直在push msg
AddressSenderProducter的Send方法在做什么?
递增num_senders并构造AddressSender 只不过多线程问题 所以用了原子类型操作而已
task current 在做什么
目前acti使用的时future 1.0 + tokio 1.x 并没有搞懂里面具体的逻辑 不过大意就是获得了一个当前线程的waker 以便在后面通过notify通知到对应的actor 可以参照 rust async book中的例子就懂了
当Actor启动时发生了什么
如actix.pdf
当我们在Handle Msg 时通过ctx 自己给自己notify 一个msg时 我们做了什么
类似于此的函数实质就是直接调用spawn
/// Sends the message `msg` to self.
fn notify<M>(&mut self, msg: M)
where
A: Handler<M>,
M: Message + 'static,
{
if self.state() == ActorState::Stopped {
error!("Context::notify called for stopped actor.");
} else {
self.spawn(ActorMessageItem::new(msg));
}
}
fn notify_later<M>(&mut self, msg: M, after: Duration) -> SpawnHandle
where
A: Handler<M>,
M: Message + 'static,
{
if self.state() == ActorState::Stopped {
error!("Context::notify_later called for stopped actor.");
SpawnHandle::default()
} else {
self.spawn(ActorDelayedMessageItem::new(msg, after))
}
}
fn run_later<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
where
F: FnOnce(&mut A, &mut A::Context) + 'static,
{
self.spawn(TimerFunc::new(dur, f))
}
/// Spawns a job to execute the given closure periodically, at a
/// specified fixed interval.
fn run_interval<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
where
F: FnMut(&mut A, &mut A::Context) + 'static,
{
self.spawn(IntervalFunc::new(dur, f).finish())
}
}
至于spawn 本身则是将这个这个future丢到了contextimpl的items中可以参见ContextFut的poll 这里又去poll了self.items实际上就是执行了一下这些fututre
#[inline]
/// Spawn new future to this context.
pub fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
{
let handle = self.handles[0].next();
self.handles[0] = handle;
let fut: Box<ActorFuture<Item = (), Error = (), Actor = A>> = Box::new(fut);
self.items.push((handle, fut));
handle
}
当我们获得一个actor的address 并通过这个address send一个msg时 我们做了什么
每一个Actor的address实际上共享一个队列 send msg 时实际上就是将msg丢到Actor的message_queue中并且去notify这个Actor
pub struct AddressSender<A: Actor> {
inner: Arc<Inner<A>>,
// 省略
}
impl<A: Actor> AddressSender<A> {
pub fn send<M>(&self, msg: M) -> Result<Receiver<M::Result>, SendError<M>>
where
A: Handler<M>,
A::Context: ToEnvelope<A, M>,
M::Result: Send,
M: Message + Send,
{
if !self.poll_unparked(false).is_ready() {
return Err(SendError::Full(msg));
}
let park_self = match self.inc_num_messages() {
Some(park_self) => park_self,
None => return Err(SendError::Closed(msg)),
};
if park_self {
self.park(true);
}
let (tx, rx) = sync_channel();
let env = <A::Context as ToEnvelope<A, M>>::pack(msg, Some(tx));
self.queue_push_and_signal(env);
Ok(rx)
}
fn queue_push_and_signal(&self, msg: Envelope<A>) {
self.inner.message_queue.push(msg);
self.signal();
}
fn signal(&self) {
let task = {
let mut recv_task = self.inner.recv_task.lock();
if recv_task.unparked {
return;
}
recv_task.unparked = true;
recv_task.task.take()
};
if let Some(task) = task {
task.notify();
}
}
}
剩下的问题就是address是怎么来的
- Actor启动时返回的Address
impl<A> Context<A>
where
A: Actor<Context = Self>,
{
#[inline]
pub(crate) fn new() -> Context<A> {
let mb = Mailbox::default();
Context {
parts: ContextParts::new(mb.sender_producer()),
mb: Some(mb),
}
}
#[inline]
pub fn run(self, act: A) -> Addr<A> {
let fut = self.into_future(act);
let addr = fut.address();
Arbiter::spawn(fut);
addr
}
}
impl<A, C> ContextFut<A, C>
where
C: AsyncContextParts<A>,
A: Actor<Context = C>,
{
#[inline]
pub fn address(&self) -> Addr<A> {
self.mailbox.address()
}
}
impl<A> Mailbox<A>
where
A: Actor,
A::Context: AsyncContext<A>,
{
pub fn address(&self) -> Addr<A> {
Addr::new(self.msgs.sender())
}
}
问题就是MailBox的msgs到底是什么
impl<A> Default for Mailbox<A>
where
A: Actor,
A::Context: AsyncContext<A>,
{
#[inline]
fn default() -> Self {
let (_, rx) = channel::channel(DEFAULT_CAPACITY);
Mailbox { msgs: rx }
}
}
channel又是一个很麻烦的东西
pub fn channel<A: Actor>(buffer: usize) -> (AddressSender<A>, AddressReceiver<A>) {
// Check that the requested buffer size does not exceed the maximum buffer
// size permitted by the system.
assert!(buffer < MAX_BUFFER, "requested buffer size too large");
let inner = Arc::new(Inner {
buffer: AtomicUsize::new(buffer),
state: AtomicUsize::new(INIT_STATE),
message_queue: Queue::new(),
parked_queue: Queue::new(),
num_senders: AtomicUsize::new(1),
recv_task: Mutex::new(ReceiverTask {
unparked: false,
task: None,
}),
});
let tx = AddressSender {
inner: Arc::clone(&inner),
sender_task: Arc::new(Mutex::new(SenderTask::new())),
maybe_parked: Arc::new(AtomicBool::new(false)),
};
let rx = AddressReceiver { inner };
(tx, rx)
}
在这里我们可以看到Inner终于被创建出来的 下面的无论是sender 还是sender_producer 实质上使用的都是同一份inner 也就是说通过Actor的address() 就能够将Msg丢到Actor的MailBox中这样在Actor被poll时他就会通过self.mailbox.poll()来调用真正的处理函数
impl<A: Actor> AddressReceiver<A> {
pub fn sender(&self) -> AddressSender<A> {
// this code same as Sender::clone
let mut curr = self.inner.num_senders.load(SeqCst);
loop {
// If the maximum number of senders has been reached, then fail
if curr == self.inner.max_senders() {
panic!("cannot clone `Sender` -- too many outstanding senders");
}
let next = curr + 1;
let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
// The ABA problem doesn't matter here. We only care that the
// number of senders never exceeds the maximum.
if actual == curr {
return AddressSender {
inner: Arc::clone(&self.inner),
sender_task: Arc::new(Mutex::new(SenderTask::new())),
maybe_parked: Arc::new(AtomicBool::new(false)),
};
}
curr = actual;
}
}
/// Creates the sender producer.
pub fn sender_producer(&self) -> AddressSenderProducer<A> {
AddressSenderProducer {
inner: self.inner.clone(),
}
}
}
## ReceiverTask 中的task什么时候被设置进去的
初始化时为none在AddressReceiver被poll时 (mailbox被poll时) 的try_park中被设置
也就是在Actor第一次被Poll时设置 也就是Arbiter::spawn时被设置
```rust
impl<A: Actor> Stream for AddressReceiver<A> {
type Item = Envelope<A>;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
// Try to read a message off of the message queue.
let msg = match self.next_message() {
Async::Ready(msg) => msg,
Async::NotReady => {
// There are no messages to read, in this case, attempt to
// park. The act of parking will verify that the channel is
// still empty after the park operation has completed.
match self.try_park() {
TryPark::Parked => {
// The task was parked, and the channel is still
// empty, return NotReady.
return Ok(Async::NotReady);
}
TryPark::NotEmpty => {
// A message has been sent while attempting to
// park. Loop again, the next iteration is
// guaranteed to get the message.
continue;
}
}
}
};
// If there are any parked task handles in the parked queue, pop
// one and unpark it.
self.unpark_one();
// Decrement number of messages
self.dec_num_messages();
// Return the message
return Ok(Async::Ready(msg));
}
}
}
impl<A: Actor> AddressReceiver<A> {
fn try_park(&self) -> TryPark {
// First, track the task in the `recv_task` slot
let mut recv_task = self.inner.recv_task.lock();
if recv_task.unparked {
// Consume the `unpark` signal without actually parking
recv_task.unparked = false;
return TryPark::NotEmpty;
}
recv_task.task = Some(task::current());
TryPark::Parked
}
}
至此一切就已经联系起来了 当我们向adr中send msg 时实际上在向这个Actor的mailbox中的message_queue中推Envelope
并且notify下Actor的Task因为Actor的第一次被poll就会设置task所以send了一个msg时能够找到Actor的task并去notify他