请选择 进入手机版 | 继续访问电脑版

期翼嘻嘻即时通讯综合平台

 找回密码
 立即注册
查看: 336|回复: 0

WebRTC 有关 AsyncInvoker 执行流程分析 [复制链接]

Rank: 9Rank: 9Rank: 9

发表于 2020-5-23 20:48:17 |显示全部楼层
下面的代码是从别人摘过来的一个小例子,主要是方便说明 AsyncInvoker 运作机制的引言。

头文件 myclass.h

#include "webrtc/rtc_base/asyncinvoker.h"
#include <iostream>
#include <memory>

class MyClass
{
public:
    MyClass();
    void OneTask(rtc::Thread *thread, int x);
    void AnotherAsyncTask(int x);

private:
    std::unique_ptr<rtc::AsyncInvoker> invoker;
};


源码 myclass.cpp

#include "myclass.h"
MyClass::MyClass()
{
    invoker.reset(new rtc::AsyncInvoker());
    std::cout << "Main Thread ID:" << rtc::Thread::Current()->GetId() << std::endl;
}

void MyClass::OneTask(rtc::Thread *thread, int x)
{
    invoker->AsyncInvoke<void>(RTC_FROM_HERE, thread, rtc::Bind(&MyClass::AnotherAsyncTask, (MyClass*)this, x));
}


void MyClass::AnotherAsyncTask(int x)
{
    std::cout << "Worker Thread ID:" << rtc::Thread::Current()->GetId() << std::endl;
    std::cout << "Input Value Is:" << x << std::endl;
}

main 函数 main.cpp

#include "myclass.h"
int main()
{
    std::unique_ptr<rtc::Thread> myThread = rtc::Thread::Create();
    myThread->Start();
    MyClass *myClass = new MyClass;
    myClass->OneTask(myThread.get(), 10);
    Sleep(10000);

    return 0;
}

上述例子中, std::unique_ptr<rtc::Thread> myThread = rtc::Thread::Create();  这句创建了一个线程,这个线程是整个 AsyncInvoker 推动的引擎, 整个异步调用,就它驱动执行的。那么我们首先分析一下这个类:

class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, public webrtc::TaskQueueBase

仔细分析它的源码其实发现 Thread  类的功能就是实现了一个线程驱动一个消息队列。

有关线程部分的操作都在 Thread 的源码中, 最重要的三个函数 Start, Stop, Run

有关消息队列的也很明显都在 MessageQueue 里,比较重要的几个函数 Post, PostDelayed, Get, Dispatch。

有了上述的概念,我们根据 webrtc 的源码进行追踪一下。

1. 上述例子里 myThread->Start();   启动一个线程,其实就是根据配置的参数调用系统 API 创建一个线程,线程创建成功进入 Thread:reRun 函数

2.  Thread:reRun 函数里面做了一些初始化,进而调用  Thread::Run 函数。

3.  Thread::Run 函数里更简单,直接调用 Thread:rocessMessages 函数

4.  Thread:rocessMessages 函数里面就调用了 MessageQueue::Get 和 MessageQueue:ispatch 函数

5.  MessageQueue::Get 里面其实就是从 msgq_ 队列里取消息,如果没有消息,则进行等待,如果有消息,就取出

6.  取出的消息交给 MessageQueue:ispatch 函数进行处理, 进一步深入到函数内部, 发现下面这句

pmsg->phandler->OnMessage(pmsg);

其实就是执行消息数据对象对应的 phandler 所指向的函数,这个消息数据从哪里来的呢? 这个无须多考虑,肯定是 AsyncInvoke 设置过过来的,而投递数据的接口根据需求有好几个,我们拿其中之一来分析即可,那就以 MessageQueue:ost 为代表吧。

7.  MessageQueue:ost 里

Message msg;
    msg.posted_from = posted_from;
    msg.phandler = phandler;
    msg.message_id = id;
    msg.pdata = pdata;
    if (time_sensitive) {
      msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
    }
    msgq_.push_back(msg);

看到这个消息数据的产生了吧,这个设置肯定是由 AsyncInvoke 设置过来的,那我们从上面例子里 AsyncInvoke 的接口往下追踪。

8.  AsyncInvoker::AsyncInvoke

invoker->AsyncInvoke<void>(RTC_FROM_HERE, thread, rtc::Bind(&MyClass::AnotherAsyncTask, (MyClass*)this, x));

从上述例子里面,我们知道这句调用会进入构造函数,而 AsyncInvoke 的构造函数内调用了, AsyncInvoke:oInvoke 。

而构造函数的两个重要的参数一个是 thread , 一个是 FunctorT ,不出意外这个 FunctorT 步骤 7 里面的  msg.phandler = phandler;  而 DoInvoke 的参数是 std::unique_ptr<AsyncClosure> closure(new FireAndForgetAsyncClosure<FunctorT>(this, std::forward<FunctorT>(functor)));

9.  我们继续追踪 AsyncInvoker:oInvoke

thread->ost(posted_from, this, id,
               new ScopedMessageData<AsyncClosure>(std::move(closure)));

到这里,我们就明白了,其实这步就是上述步骤  7,  MessageQueue:ost 函数,而 this 指针就是 AsyncInvoker 实例自己,上述第 6 步执行的 OnMessage 就是 AsyncInvoker::OnMessage

10. 我们进入 AsyncInvoker::OnMessage 里进行分析

ScopedMessageData<AsyncClosure>* data =
      static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata);
  // Execute the closure and trigger the return message if needed.
  data->inner_data().Execute();

这里的 Execute 就是上述第 8 步的 FireAndForgetAsyncClosure::Execute, 这个里面就是保存的 rtc::Bind(&MyClass::AnotherAsyncTask, (MyClass*)this, x) 中的设置的 MyClass::AnotherAsyncTask 类函数指针!!!



至此我们的异步调用分析完毕,就是 Thread 里完成的。


使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

Archiver|手机版|期翼嘻嘻论坛企业即时通讯综合平台 ( 京 ICP 备 10015350 )

GMT+8, 2020-8-16 03:53 , Processed in 0.060064 second(s), 9 queries .

Powered by Discuz! X2

© 2001-2011 Comsenz Inc.

回顶部