- 注册时间
- 2012-1-4
- 最后登录
- 2024-7-20
- 阅读权限
- 200
- 积分
- 4795
- 精华
- 1
- 帖子
- 436
|
发表于 2020-5-23 20:48:17
|显示全部楼层
下面的代码是从别人摘过来的一个小例子,主要是方便说明 AsyncInvoker 运作机制的引言。
头文件 myclass.h
#include "webrtc/rtc_base/asyncinvoker.h"
#include <iostream>
#include <memory>
class MyClass
{
public:
MyClass();
void OneTask(rtc::Thread *thread, int x);
void AnotherAsyncTask(int x);
private:
std::unique_ptr<rtc::AsyncInvoker> invoker;
};
源码 myclass.cpp
#include "myclass.h"
MyClass::MyClass()
{
invoker.reset(new rtc::AsyncInvoker());
std::cout << "Main Thread ID:" << rtc::Thread::Current()->GetId() << std::endl;
}
void MyClass::OneTask(rtc::Thread *thread, int x)
{
invoker->AsyncInvoke<void>(RTC_FROM_HERE, thread, rtc::Bind(&MyClass::AnotherAsyncTask, (MyClass*)this, x));
}
void MyClass::AnotherAsyncTask(int x)
{
std::cout << "Worker Thread ID:" << rtc::Thread::Current()->GetId() << std::endl;
std::cout << "Input Value Is:" << x << std::endl;
}
main 函数 main.cpp
#include "myclass.h"
int main()
{
std::unique_ptr<rtc::Thread> myThread = rtc::Thread::Create();
myThread->Start();
MyClass *myClass = new MyClass;
myClass->OneTask(myThread.get(), 10);
Sleep(10000);
return 0;
}
上述例子中, std::unique_ptr<rtc::Thread> myThread = rtc::Thread::Create(); 这句创建了一个线程,这个线程是整个 AsyncInvoker 推动的引擎, 整个异步调用,就它驱动执行的。那么我们首先分析一下这个类:
class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, public webrtc::TaskQueueBase
仔细分析它的源码其实发现 Thread 类的功能就是实现了一个线程驱动一个消息队列。
有关线程部分的操作都在 Thread 的源码中, 最重要的三个函数 Start, Stop, Run
有关消息队列的也很明显都在 MessageQueue 里,比较重要的几个函数 Post, PostDelayed, Get, Dispatch。
有了上述的概念,我们根据 webrtc 的源码进行追踪一下。
1. 上述例子里 myThread->Start(); 启动一个线程,其实就是根据配置的参数调用系统 API 创建一个线程,线程创建成功进入 Thread:reRun 函数
2. Thread:reRun 函数里面做了一些初始化,进而调用 Thread::Run 函数。
3. Thread::Run 函数里更简单,直接调用 Thread:rocessMessages 函数
4. Thread:rocessMessages 函数里面就调用了 MessageQueue::Get 和 MessageQueue:ispatch 函数
5. MessageQueue::Get 里面其实就是从 msgq_ 队列里取消息,如果没有消息,则进行等待,如果有消息,就取出
6. 取出的消息交给 MessageQueue:ispatch 函数进行处理, 进一步深入到函数内部, 发现下面这句
pmsg->phandler->OnMessage(pmsg);
其实就是执行消息数据对象对应的 phandler 所指向的函数,这个消息数据从哪里来的呢? 这个无须多考虑,肯定是 AsyncInvoke 设置过过来的,而投递数据的接口根据需求有好几个,我们拿其中之一来分析即可,那就以 MessageQueue:ost 为代表吧。
7. MessageQueue:ost 里
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
if (time_sensitive) {
msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
}
msgq_.push_back(msg);
看到这个消息数据的产生了吧,这个设置肯定是由 AsyncInvoke 设置过来的,那我们从上面例子里 AsyncInvoke 的接口往下追踪。
8. AsyncInvoker::AsyncInvoke
invoker->AsyncInvoke<void>(RTC_FROM_HERE, thread, rtc::Bind(&MyClass::AnotherAsyncTask, (MyClass*)this, x));
从上述例子里面,我们知道这句调用会进入构造函数,而 AsyncInvoke 的构造函数内调用了, AsyncInvoke:oInvoke 。
而构造函数的两个重要的参数一个是 thread , 一个是 FunctorT ,不出意外这个 FunctorT 步骤 7 里面的 msg.phandler = phandler; 而 DoInvoke 的参数是 std::unique_ptr<AsyncClosure> closure(new FireAndForgetAsyncClosure<FunctorT>(this, std::forward<FunctorT>(functor)));
9. 我们继续追踪 AsyncInvoker:oInvoke
thread->ost(posted_from, this, id,
new ScopedMessageData<AsyncClosure>(std::move(closure)));
到这里,我们就明白了,其实这步就是上述步骤 7, MessageQueue:ost 函数,而 this 指针就是 AsyncInvoker 实例自己,上述第 6 步执行的 OnMessage 就是 AsyncInvoker::OnMessage
10. 我们进入 AsyncInvoker::OnMessage 里进行分析
ScopedMessageData<AsyncClosure>* data =
static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata);
// Execute the closure and trigger the return message if needed.
data->inner_data().Execute();
这里的 Execute 就是上述第 8 步的 FireAndForgetAsyncClosure::Execute, 这个里面就是保存的 rtc::Bind(&MyClass::AnotherAsyncTask, (MyClass*)this, x) 中的设置的 MyClass::AnotherAsyncTask 类函数指针!!!
至此我们的异步调用分析完毕,就是 Thread 里完成的。
|
|