请选择 进入手机版 | 继续访问电脑版

期翼嘻嘻即时通讯综合平台

 找回密码
 立即注册
查看: 3795|回复: 0

webrtc 中有关 socket 运行机制以及 stun 收发过程 及 Candidates 生 [复制链接]

Rank: 9Rank: 9Rank: 9

发表于 2020-5-23 20:45:37 |显示全部楼层
----------------------------------------------------------------------------------------------------------------------------------------

一分钟快速搭建 rtmpd 服务器: https://blog.csdn.net/freeabc/article/details/102880984

软件下载地址: http://www.qiyicc.com/download/rtmpd.rar

github 地址:https://github.com/superconvert/smart_rtmpd

-----------------------------------------------------------------------------------------------------------------------------------------



webrtc 中有关 socket 运行机制以及 stun 收发过程 及 Candidates 生成流程分析
我写文章一般是两个思路:
1. 下一步要调用什么对象的方法
2.  这一步的对象,怎么关联到下一步的对象的流程分析
这一步的流程主要阐述怎么关联下一步的对象的流程分析,当然这一步做了什么具体的工作,不能
详细展示,否则,太庞大了,需要各位朋友针对重点的部分,自己揣摩了。

//*******************************************************************************************
//
// webrtc 内部很多创建 socket 的地方,这个需要调用类厂 BasicPacketSocketFactory , 下面
// 这一小段就是分析 BasicPacketSocketFactory 的创建,以及内部管理的 socket 的部分流程
//
//*******************************************************************************************
    AsyncPacketSocket* BasicPacketSocketFactory::CreateUdpSocket(
        const SocketAddress& address,
        uint16_t min_port,
        uint16_t max_port) {
      // UDP sockets are simple.
      // 参见下面的 SocketDispatcher
      AsyncSocket* socket =
          socket_factory()->CreateAsyncSocket(address.family(), SOCK_DGRAM);
      if (!socket) {
        return NULL;
      }
      //----------------------------------------------------------------------------
      // 这个 BindSocket 最终会调用系统的 bind
      //----------------------------------------------------------------------------
      if (BindSocket(socket, address, min_port, max_port) < 0) {
        RTC_LOG(LS_ERROR) << "UDP bind failed with error " << socket->GetError();
        delete socket;
        return NULL;
      }

      //----------------------------------------------------------------------------------------
      // 这个里面绑定了读和写事件到 AsyncUDPSocket::OnReadEvent , AsyncUDPSocket::OnWriteEvent
      //----------------------------------------------------------------------------------------  
      return new AsyncUDPSocket(socket);
    }

    1. 创建 BasicPacketSocketFactory
    ./pc/peer_connection_factory.cc
    BasicPacketSocketFactory 是 PeerConnectionFactory::Initialize() 中创建的
    default_socket_factory_.reset(new rtc::BasicPacketSocketFactory(network_thread_));     

    2.
    ./sdk/android/src/jni/pc/peer_connection_factory.cc
    而 network_thread_ 则是 接口 CreatePeerConnectionFactoryForJava 里的
    std::unique_ptr<rtc::Thread> network_thread = rtc::Thread::CreateWithSocketServer();   
    其实就是这个
    std::unique_ptr<Thread> Thread::CreateWithSocketServer() {
        return std::unique_ptr<Thread>(new Thread(SocketServer::CreateDefault()));
    }

    其实就是创建了 PhysicalSocketServer
    std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
    #if defined(__native_client__)
        return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
    #else
        return std::unique_ptr<SocketServer>(new rtc:hysicalSocketServer);
    #endif
    }

    Thread 继承于 class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, public webrtc::TaskQueueBase
    构造函数 Thread(SocketServer* ss)把 ss 赋值给基类 MessageQueue,基类接口通过 socketserver 返回这个对象
    SocketServer* MessageQueue::socketserver() {
        return ss_;
    }

    上面的 BasicPacketSocketFactory::CreateUdpSocket 里的,这句话 socket_factory()->CreateAsyncSocket 其实就是调用
    ./rtc_base/physical_socket_server.cc
    AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {        
        SocketDispatcher* dispatcher = new SocketDispatcher(this);
        // 这个里面通过 PhysicalSocket::Create 创建一个套接字
        if (dispatcher->Create(family, type)) {
            return dispatcher;
        } else {
            delete dispatcher;
            return nullptr;
        }
    }

//******************************************************************************
//
// 下面这段是讲述 socket 怎么接收数据的,和上述流程没任何关系
//
//******************************************************************************

    上述流程中,有一个这个函数调用,
    std::unique_ptr<Thread> Thread::CreateWithSocketServer() {
        return std::unique_ptr<Thread>(new Thread(SocketServer::CreateDefault()));
    }
    创建一个带线程的 socket    这个线程的 Run 如下:

    void Thread::Run() {
        ProcessMessages(kForever);
    }

    // 这个里面不断的 Get 最新的 message 进行处理
    bool Thread:rocessMessages(int cmsLoop) {
      // Using ProcessMessages with a custom clock for testing and a time greater
      // than 0 doesn't work, since it's not guaranteed to advance the custom
      // clock's time, and may get stuck in an infinite loop.
      RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
                 cmsLoop == kForever);
      int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
      int cmsNext = cmsLoop;

      while (true) {
    #if defined(WEBRTC_MAC)
        ScopedAutoReleasePool pool;
    #endif
        Message msg;
        if (!Get(&msg, cmsNext))
          return !IsQuitting();
        Dispatch(&msg);

        if (cmsLoop != kForever) {
          cmsNext = static_cast<int>(TimeUntil(msEnd));
          if (cmsNext < 0)
            return true;
        }
      }
    }

    // 其实就是基类的 MessageQueue 的接口
    bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io)
        // 看到这个 ss_ 了吗,就是 SocketServer::CreateDefault() 也就是 PhysicalSocketServer::Wait 接口
        if (!ss_->Wait(static_cast<int>(cmsNext), process_io))

    这个地方监听所有的 socket 操作,三个版本的都有 win, linux,随便找一个分析
    ./rtc_base/physical_socket_server.cc
    bool PhysicalSocketServer::Wait(int cmsWait, bool process_io)
        return WaitEpoll(cmsWait, signal_wakeup_);

    bool PhysicalSocketServer::WaitEpoll(int cmsWait)
        ProcessEvents(pdispatcher, readable, writable, check_error);

    static void ProcessEvents(Dispatcher* dispatcher, bool readable, bool writable, bool check_error)
        // 这里就是 SocketDispatcher -
        dispatcher->OnEvent(ff, errcode);

    void SocketDispatcher::OnEvent(uint32_t ff, int err)
        如果是读,这里假设是 UDP
        SignalReadEvent(this);

    ./rtc_base/async_udp_socket.cc
    void AsyncUDPSocket::OnReadEvent(AsyncSocket* socket)
        SignalReadPacket(this, buf_, static_cast<size_t>(len), remote_addr,
                   (timestamp > -1 ? timestamp : TimeMicros()));

    ./p2p/base/stun_port.cc
    void UDPPort::OnReadPacket(rtc::AsyncPacketSocket* socket,
        const char* data,
        size_t size,
        const rtc::SocketAddress& remote_addr,
        const int64_t& packet_time_us) {   
        RTC_DCHECK(socket == socket_);
        RTC_DCHECK(!remote_addr.IsUnresolvedIP());

        // Look for a response from the STUN server.
        // Even if the response doesn't match one of our outstanding requests, we
        // will eat it because it might be a response to a retransmitted packet, and
        // we already cleared the request when we got the first response.        
        if (server_addresses_.find(remote_addr) != server_addresses_.end()) {
            // 这是 stun 阶段接收包
            requests_.CheckResponse(data, size);
            return;
        }

        // 这是建立链接后接收包,参考 webrtc 的视频数据接收过程
        if (Connection* conn = GetConnection(remote_addr)) {
            conn->OnReadPacket(data, size, packet_time_us);
        } else {
            Port::OnReadPacket(data, size, remote_addr, PROTO_UDP);
        }
    }

//******************************************************************************
//
// 下面就分析了有关 webrtc stun 流程的部分
//
//******************************************************************************

1. 从这里开始分析,这个的调用参考 createPeerConnection 流程
JsepTransportController::MaybeStartGathering

2. 这个 ice_transport 就是 P2PTransportChannel 对象
dtls->ice_transport()->MaybeStartGathering();

3. 第一次创建流程
./p2p/base/p2p_transport_channel.cc
P2PTransportChannel::MaybeStartGathering
    //------------------------------------------------------------
    // 这个就是创建一个 PortAllocatorSession 并把信号挂接 P2PTransportChannel
    //------------------------------------------------------------
    AddAllocatorSession(allocator_->CreateSession(
          transport_name(), component(), ice_parameters_.ufrag,
          ice_parameters_.pwd));
    // 进行 PortAllocatorSession 接口的调用
    allocator_sessions_.back()->StartGettingPorts();

    3.1
    这个 allocator_ 来自下面的函数调用,我们看出 就是 JsepTransportController 的成员 port_allocator_
    ./pc/jsep_transport_controller.cc
    rtc::scoped_refptr<webrtc::IceTransportInterface>
    JsepTransportController::CreateIceTransport(const std::string& transport_name, bool rtcp) {
        int component = rtcp ? cricket::ICE_CANDIDATE_COMPONENT_RTCP : cricket::ICE_CANDIDATE_COMPONENT_RTP;

        IceTransportInit init;
        init.set_port_allocator(port_allocator_);
        init.set_async_resolver_factory(async_resolver_factory_);
        init.set_event_log(config_.event_log);
        return config_.ice_transport_factory->CreateIceTransport(transport_name, component, std::move(init));
    }

    ./api/ice_transport_factory.cc
    rtc::scoped_refptr<IceTransportInterface> CreateIceTransport(IceTransportInit init) {
        return new rtc::RefCountedObject<IceTransportWithTransportChannel>(
            std::make_unique<cricket:2PTransportChannel>(
            "", 0, init.port_allocator(), init.async_resolver_factory(), init.event_log()));
    }

    3.2
    我们跟踪一下 port_allocator_ 是在 JsepTransportController 初始化过程中传递过来的,我们分析 JsepTransportController
    初始化,发现其实就是来自 PeerConnection 的 port_allocator_ 对象

    ./pc/peer_connection.cc
    bool PeerConnection::Initialize(const PeerConnectionInterface::RTCConfiguration& configuration,   
        PeerConnectionDependencies dependencies)

      // 传递过来的。。。。。。
      port_allocator_ = std::move(dependencies.allocator);
      ... ...
      // 赋值给 JsepTransportController
      transport_controller_.reset(new JsepTransportController(
      signaling_thread(), network_thread(), port_allocator_.get(),
      async_resolver_factory_.get(), config));

    3.3
    我们分析 PeerConnection 的初始化过程中, port_allocator_ 的产生过程
    ./pc/peer_connection_factory.cc
    rtc::scoped_refptr<eerConnectionInterface>
    PeerConnectionFactory::CreatePeerConnection(const PeerConnectionInterface::RTCConfiguration& configuration,
        PeerConnectionDependencies dependencies)

        if (!dependencies.allocator) {
            rtc:acketSocketFactory* packet_socket_factory;
            if (dependencies.packet_socket_factory)
              packet_socket_factory = dependencies.packet_socket_factory.get();
            else
              // 这个就是 BasicPacketSocketFactory 参见上面的分析
              packet_socket_factory = default_socket_factory_.get();

            network_thread_->Invoke<void>(RTC_FROM_HERE, [this, &configuration,
                &dependencies,
                &packet_socket_factory]() {
                //------------------------------------------------------
                // 这个就是我们要追踪的 port_allocator_ !!!!!!!!!!!!
                //------------------------------------------------------
                dependencies.allocator = std::make_unique<cricket::BasicPortAllocator>(
                    default_network_manager_.get(), packet_socket_factory, configuration.turn_customizer);
            });
        }

        rtc::scoped_refptr<eerConnection> pc(new rtc::RefCountedObject<eerConnection>(this, std::move(event_log),
            std::move(call)));
        ActionsBeforeInitializeForTesting(pc);
        if (!pc->Initialize(configuration, std::move(dependencies))) {
            return nullptr;
        }

    上述函数被下面这个调用,我们发现这个里面 dependencies.allocator 为空,因此 port_allocator_ 是在上面的步骤中分配的
    ./sdk/android/src/jni/pc/peer_connection_factory.cc
    static jlong JNI_PeerConnectionFactory_CreatePeerConnection(
        JNIEnv* jni,
        jlong factory,
        const JavaParamRef<jobject>& j_rtc_config,
        const JavaParamRef<jobject>& j_constraints,
        jlong observer_p,
        const JavaParamRef<jobject>& j_sslCertificateVerifier)

        PeerConnectionDependencies peer_connection_dependencies(observer.get());
        if (!j_sslCertificateVerifier.is_null()) {
            peer_connection_dependencies.tls_cert_verifier = std::make_unique<SSLCertificateVerifierWrapper>(
                jni, j_sslCertificateVerifier);
        }
        rtc::scoped_refptr<eerConnectionInterface> pc =
            PeerConnectionFactoryFromJava(factory)->CreatePeerConnection(rtc_config,
            std::move(peer_connection_dependencies));


    3.4 我们继续分析 BasicPortAllocator 的接口 CreateSession
    ./p2p/base/port_allocator.cc
    std::unique_ptr<ortAllocatorSession> PortAllocator::CreateSession(
        const std::string& content_name,
        int component,
        const std::string& ice_ufrag,
        const std::string& ice_pwd) {
      CheckRunOnValidThreadAndInitialized();
      auto session = std::unique_ptr<ortAllocatorSession>(
          CreateSessionInternal(content_name, component, ice_ufrag, ice_pwd));
      session->SetCandidateFilter(candidate_filter());
      return session;
    }

    ./p2p/client/basic_port_allocator.cc   
    PortAllocatorSession* BasicPortAllocator::CreateSessionInternal(const std::string& content_name,
        int component, const std::string& ice_ufrag, const std::string& ice_pwd) {
        CheckRunOnValidThreadAndInitialized();
        PortAllocatorSession* session = new BasicPortAllocatorSession(this, content_name, component, ice_ufrag, ice_pwd);
        session->SignalIceRegathering.connect(this, &BasicPortAllocator::OnIceRegathering);
        return session;
    }

4.
./p2p/client/basic_port_allocator.cc
void BasicPortAllocatorSession::StartGettingPorts() {
  RTC_DCHECK_RUN_ON(network_thread_);
  state_ = SessionState::GATHERING;
  if (!socket_factory_) {
    owned_socket_factory_.reset(
        new rtc::BasicPacketSocketFactory(network_thread_));
    socket_factory_ = owned_socket_factory_.get();
  }

  network_thread_->ost(RTC_FROM_HERE, this, MSG_CONFIG_START);

  RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy "
                   << turn_port_prune_policy_;
}

5.
void BasicPortAllocatorSession::OnMessage(rtc::Message* message) {
  switch (message->message_id) {
    case MSG_CONFIG_START:
      GetPortConfigurations();
      break;
    case MSG_CONFIG_READY:
      OnConfigReady(static_cast<ortConfiguration*>(message->pdata));
      break;
    case MSG_ALLOCATE:
      OnAllocate();
      break;
    case MSG_SEQUENCEOBJECTS_CREATED:
      OnAllocationSequenceObjectsCreated();
      break;
    case MSG_CONFIG_STOP:
      OnConfigStop();
      break;
    default:
      RTC_NOTREACHED();
  }
}

void BasicPortAllocatorSession::GetPortConfigurations() {
  RTC_DCHECK_RUN_ON(network_thread_);

  PortConfiguration* config =
      new PortConfiguration(allocator_->stun_servers(), username(), password());

  for (const RelayServerConfig& turn_server : allocator_->turn_servers()) {
    config->AddRelay(turn_server);
  }
  ConfigReady(config);
}

void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) {
  RTC_DCHECK_RUN_ON(network_thread_);
  network_thread_->ost(RTC_FROM_HERE, this, MSG_CONFIG_READY, config);
}

6.
void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) {
  RTC_DCHECK_RUN_ON(network_thread_);
  if (config) {
    configs_.push_back(config);
  }

  AllocatePorts();
}

void BasicPortAllocatorSession::AllocatePorts() {
  RTC_DCHECK_RUN_ON(network_thread_);
  network_thread_->ost(RTC_FROM_HERE, this, MSG_ALLOCATE);
}

7.
void BasicPortAllocatorSession::OnAllocate() {
  RTC_DCHECK_RUN_ON(network_thread_);

  if (network_manager_started_ && !IsStopped()) {
    bool disable_equivalent_phases = true;
    DoAllocate(disable_equivalent_phases);
  }

  allocation_started_ = true;
}

void BasicPortAllocatorSession:oAllocate(bool disable_equivalent)

    AllocationSequence* sequence = new AllocationSequence(this, networks, config, sequence_flags);
    sequence->SignalPortAllocationComplete.connect(
        this, &BasicPortAllocatorSession::OnPortAllocationComplete);
    sequence->Init();
    sequence->Start();
    sequences_.push_back(sequence);
    done_signal_needed = true;

    network_thread_->Post(RTC_FROM_HERE, this, MSG_SEQUENCEOBJECTS_CREATED);

    7.1 sequence->Init() 这个里面创建了一个 UDP 套接字,并绑定读取接口
    ./p2p/client/basic_port_allocator.cc
    void AllocationSequence::Init() {
        if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) {
            udp_socket_.reset(session_->socket_factory()->CreateUdpSocket(
            rtc::SocketAddress(network_->GetBestIP(), 0),
            session_->allocator()->min_port(), session_->allocator()->max_port()));
            if (udp_socket_) {
                udp_socket_->SignalReadPacket.connect(this,
                    &AllocationSequence::OnReadPacket);
            }
            // Continuing if |udp_socket_| is NULL, as local TCP and RelayPort using TCP
            // are next available options to setup a communication channel.
        }
    }

    7.2 sequence->Start()
        session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE);

    7.3
    void AllocationSequence::OnMessage(rtc::Message* msg) {
        RTC_DCHECK(rtc::Thread::Current() == session_->network_thread());
        RTC_DCHECK(msg->message_id == MSG_ALLOCATION_PHASE);

        const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"};

        // Perform all of the phases in the current step.
        RTC_LOG(LS_INFO) << network_->ToString() << ": Allocation Phase=" << PHASE_NAMES[phase_];

        switch (phase_) {
            case PHASE_UDP:
            CreateUDPPorts();
            CreateStunPorts();
            break;

        case PHASE_RELAY:
            CreateRelayPorts();
            break;

        case PHASE_TCP:
            CreateTCPPorts();
            state_ = kCompleted;
            break;

        default:
            RTC_NOTREACHED();
        }

        if (state() == kRunning) {
            ++phase_;
            session_->network_thread()->PostDelayed(RTC_FROM_HERE,
                session_->allocator()->step_delay(),
                this, MSG_ALLOCATION_PHASE);
        } else {
            // If all phases in AllocationSequence are completed, no allocation
            // steps needed further. Canceling  pending signal.
            session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
            SignalPortAllocationComplete(this);
        }
    }

    7.4
    void AllocationSequence::CreateUDPPorts()
        // 把上述创建的 udp_socket_
        port = UDPPort::Create(
        session_->network_thread(), session_->socket_factory(), network_,
        session_->allocator()->min_port(), session_->allocator()->max_port(),
        session_->username(), session_->password(),
        session_->allocator()->origin(), emit_local_candidate_for_anyaddress,
        session_->allocator()->stun_candidate_keepalive_interval());

        //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        // 参见下面的 AddAllocatedPort 主要是 OnCandidateReady, OnPortComplete
        //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        session_->AddAllocatedPort(port.release(), this, true);

        7.4.1
        ./p2p/base/stun_port.h
        static std::unique_ptr<UDPPort> Create(
            rtc::Thread* thread,
            rtc:acketSocketFactory* factory,
            rtc::Network* network,
            rtc::AsyncPacketSocket* socket,
            const std::string& username,
            const std::string& password,
            const std::string& origin,
            bool emit_local_for_anyaddress,
            absl:ptional<int> stun_keepalive_interval) {
            // Using `new` to access a non-public constructor.
            auto port = absl::WrapUnique(new UDPPort(thread, factory, network, socket,
                username, password, origin,
                emit_local_for_anyaddress));
            port->set_stun_keepalive_delay(stun_keepalive_interval);
            if (!port->Init()) {
                return nullptr;
            }
            return port;
        }

        7.4.2
        bool UDPPort::Init()
            stun_keepalive_lifetime_ = GetStunKeepaliveLifetime();
            if (!SharedSocket()) {
                RTC_DCHECK(socket_ == nullptr);
                //---------------------------------------------------------------------------------
                // 这里的 socket_factory 其实就是上面的 BasicPacketSocketFactory 的接口,创建 socket 并绑定
                //---------------------------------------------------------------------------------
                socket_ = socket_factory()->CreateUdpSocket(
                    rtc::SocketAddress(Network()->GetBestIP(), 0), min_port(), max_port());
                if (!socket_) {
                    RTC_LOG(LS_WARNING) << ToString() << ": UDP socket creation failed";
                    return false;
                }
                socket_->SignalReadPacket.connect(this, &UDPPort::OnReadPacket);
            }
            socket_->SignalSentPacket.connect(this, &UDPPort::OnSentPacket);
            socket_->SignalReadyToSend.connect(this, &UDPPort::OnReadyToSend);
            socket_->SignalAddressReady.connect(this, &UDPPort::OnLocalAddressReady);
            requests_.SignalSendPacket.connect(this, &UDPPort::OnSendPacket);

    7.5
    void BasicPortAllocatorSession::AddAllocatedPort(Port* port,
                                                 AllocationSequence* seq,
                                                 bool prepare_address) {
        RTC_DCHECK_RUN_ON(network_thread_);
        if (!port)
            return;

        RTC_LOG(LS_INFO) << "Adding allocated port for " << content_name();
        port->set_content_name(content_name());
        port->set_component(component());
        port->set_generation(generation());
        if (allocator_->proxy().type != rtc:ROXY_NONE)
            port->set_proxy(allocator_->user_agent(), allocator_->proxy());
        port->set_send_retransmit_count_attribute(
            (flags() & PORTALLOCATOR_ENABLE_STUN_RETRANSMIT_ATTRIBUTE) != 0);

        PortData data(port, seq);
        ports_.push_back(data);

        port->SignalCandidateReady.connect(
            this, &BasicPortAllocatorSession::OnCandidateReady);
        port->SignalCandidateError.connect(
            this, &BasicPortAllocatorSession::OnCandidateError);
        port->SignalPortComplete.connect(this,
            &BasicPortAllocatorSession::OnPortComplete);
        port->SignalDestroyed.connect(this,
            &BasicPortAllocatorSession::OnPortDestroyed);
        port->SignalPortError.connect(this, &BasicPortAllocatorSession::OnPortError);
            RTC_LOG(LS_INFO) << port->ToString() << ": Added port to allocator";

        if (prepare_address)
            port->PrepareAddress();
    }

    7.6
    void UDPPort:repareAddress() {
        RTC_DCHECK(requests_.empty());
        if (socket_->GetState() == rtc::AsyncPacketSocket::STATE_BOUND) {
            OnLocalAddressReady(socket_, socket_->GetLocalAddress());
        }
    }

    void UDPPort::OnLocalAddressReady(rtc::AsyncPacketSocket* socket,
                                  const rtc::SocketAddress& address) {
        // When adapter enumeration is disabled and binding to the any address, the
        // default local address will be issued as a candidate instead if
        // |emit_local_for_anyaddress| is true. This is to allow connectivity for
        // applications which absolutely requires a HOST candidate.
        rtc::SocketAddress addr = address;

        // If MaybeSetDefaultLocalAddress fails, we keep the "any" IP so that at
        // least the port is listening.
        MaybeSetDefaultLocalAddress(&addr);

        AddAddress(addr, addr, rtc::SocketAddress(), UDP_PROTOCOL_NAME, "", "",
                 LOCAL_PORT_TYPE, ICE_TYPE_PREFERENCE_HOST, 0, "", false);
        MaybePrepareStunCandidate();
    }

    void UDPPort::MaybePrepareStunCandidate() {
        // Sending binding request to the STUN server if address is available to
        // prepare STUN candidate.
        if (!server_addresses_.empty()) {
            SendStunBindingRequests();
        } else {
            // Port is done allocating candidates.
            MaybeSetPortCompleteOrError();
        }
    }

    7.7 这个地方发送 stun 的绑定命令到 stun 服务器
    void UDPPort::SendStunBindingRequests() {
        // We will keep pinging the stun server to make sure our NAT pin-hole stays
        // open until the deadline (specified in SendStunBindingRequest).
        RTC_DCHECK(requests_.empty());

        for (ServerAddresses::const_iterator it = server_addresses_.begin();
            it != server_addresses_.end(); ++it) {
            SendStunBindingRequest(*it);
        }
    }

    //--------------------------------------------------------------------------------
    // stun 服务器响应流程分析 socket 接收数据开始
    //--------------------------------------------------------------------------------
    所有的 stun 信令都在这个里面
    // 这个流程上面已经分析过了
    7.7.1
    bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io)
        // 看到这个 ss_ 了吗,就是 SocketServer::CreateDefault() 也就是 PhysicalSocketServer::Wait 接口
        if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
    7.7.2   
    bool PhysicalSocketServer::Wait(int cmsWait, bool process_io)
        return WaitEpoll(cmsWait);
    7.7.3
    bool PhysicalSocketServer::WaitEpoll(int cmsWait)
        ProcessEvents(pdispatcher, readable, writable, check_error);
    7.7.4
    static void ProcessEvents(Dispatcher* dispatcher,
                          bool readable,
                          bool writable,
                          bool check_error)
        dispatcher->OnEvent(ff, errcode);        
    7.7.5
    ./rtc_base/physical_socket_server.cc
    void SocketDispatcher::OnEvent(uint32_t ff, int err)
        SignalReadEvent(this);
    7.7.6
    ./rtc_base/async_udp_socket.cc
    void AsyncUDPSocket::OnReadEvent(AsyncSocket* socket)
        SignalReadPacket(this, buf_, static_cast<size_t>(len), remote_addr,
                   (timestamp > -1 ? timestamp : TimeMicros()));
    7.7.7
    ./p2p/base/stun_port.cc
    void UDPPort::OnReadPacket(rtc::AsyncPacketSocket* socket,
        const char* data,
        size_t size,
        const rtc::SocketAddress& remote_addr,
        const int64_t& packet_time_us)
        requests_.CheckResponse(data, size);        
    OnReadPacket 绑定 AsyncUDPSocket::

    7.7.8            
    ./p2p/base/stun_request.cc
    bool StunRequestManager::CheckResponse(StunMessage* msg)
        request->OnResponse(msg);

    7.7.9
    ./p2p/base/stun_port.cc
    void StunBindingRequest::OnResponse(StunMessage* response)
         port_->OnStunBindingRequestSucceeded(this->Elapsed(), server_addr_, addr);

    7.8 stun 成功,则进入
    ./p2p/base/stun_port.cc
    void UDPPort::OnStunBindingRequestSucceeded(
        int rtt_ms,
        const rtc::SocketAddress& stun_server_addr,
        const rtc::SocketAddress& stun_reflected_addr)

        AddAddress(stun_reflected_addr, socket_->GetLocalAddress(), related_address,
               UDP_PROTOCOL_NAME, "", "", STUN_PORT_TYPE,
               ICE_TYPE_PREFERENCE_SRFLX, 0, url.str(), false);

    7.9        
    void Port::AddAddress(const rtc::SocketAddress& address,
                      const rtc::SocketAddress& base_address,
                      const rtc::SocketAddress& related_address,
                      const std::string& protocol,
                      const std::string& relay_protocol,
                      const std::string& tcptype,
                      const std::string& type,
                      uint32_t type_preference,
                      uint32_t relay_preference,
                      const std::string& url,
                      bool is_final)


        FinishAddingAddress(c, is_final);

    7.10
    void Port::FinishAddingAddress(const Candidate& c, bool is_final) {
        candidates_.push_back(c);
        SignalCandidateReady(this, c);

        PostAddAddress(is_final);
    }

    7.11
    ./p2p/client/basic_port_allocator.cc
    void BasicPortAllocatorSession::OnCandidateReady(Port* port, const Candidate& c)   
         SignalPortReady(this, port);  --->
            ./p2p/base/p2p_transport_channel.cc:168:  session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady);
            void P2PTransportChannel::OnPortReady(PortAllocatorSession* session, PortInterface* port)
                CreateConnection(port, *iter, iter->origin_port());
            bool P2PTransportChannel::CreateConnections(const Candidate& remote_candidate, PortInterface* origin_port)
                if (CreateConnection(origin_port, remote_candidate, origin_port))
            bool P2PTransportChannel::CreateConnection(PortInterface* port, const Candidate& remote_candidate, PortInterface* origin_port)
                Connection* connection = port->CreateConnection(remote_candidate, origin);
                AddConnection(connection);
            void P2PTransportChannel::AddConnection(Connection* connection)
                connection->SignalReadPacket.connect(this, &P2PTransportChannel::OnReadPacket);               

         SignalCandidatesReady(this, candidates);

    7.12
    ./p2p/base/p2p_transport_channel.cc
    void P2PTransportChannel::OnCandidatesReady(
        PortAllocatorSession* session,
        const std::vector<Candidate>& candidates) {
        RTC_DCHECK_RUN_ON(network_thread_);
        for (size_t i = 0; i < candidates.size(); ++i) {
            SignalCandidateGathered(this, candidates);
        }
    }

    7.13
    ./pc/jsep_transport_controller.cc
    void JsepTransportController::OnTransportCandidateGathered_n(
        cricket::IceTransportInternal* transport,
        const cricket::Candidate& candidate) {
        RTC_DCHECK(network_thread_->IsCurrent());

        // We should never signal peer-reflexive candidates.
        if (candidate.type() == cricket:RFLX_PORT_TYPE) {
            RTC_NOTREACHED();
            return;
        }
        std::string transport_name = transport->transport_name();
        invoker_.AsyncInvoke<void>(
            RTC_FROM_HERE, signaling_thread_, [this, transport_name, candidate] {
            SignalIceCandidatesGathered(transport_name, {candidate});
        });
    }

    7.14
    ./pc/peer_connection.cc
    void PeerConnection::OnTransportControllerCandidatesGathered(
        const std::string& transport_name,
        const cricket::Candidates& candidates) {
        int sdp_mline_index;
        if (!GetLocalCandidateMediaIndex(transport_name, &sdp_mline_index)) {
            RTC_LOG(LS_ERROR)
                << "OnTransportControllerCandidatesGathered: content name "
                << transport_name << " not found";
            return;
        }

        for (cricket::Candidates::const_iterator citer = candidates.begin();
            citer != candidates.end(); ++citer) {
            // Use transport_name as the candidate media id.
            std::unique_ptr<JsepIceCandidate> candidate(
                new JsepIceCandidate(transport_name, sdp_mline_index, *citer));
            if (local_description()) {
                mutable_local_description()->AddCandidate(candidate.get());
            }
            OnIceCandidate(std::move(candidate));
        }
    }

    void PeerConnection::OnIceCandidate(
        std::unique_ptr<IceCandidateInterface> candidate) {
      if (IsClosed()) {
        return;
      }
      ReportIceCandidateCollected(candidate->candidate());
      // 这个地方回调到 Java 层的接口,并把自己的 candidate 发送给对方
      Observer()->OnIceCandidate(candidate.get());
    }


8.
void BasicPortAllocatorSession::OnAllocationSequenceObjectsCreated() {
  RTC_DCHECK_RUN_ON(network_thread_);
  allocation_sequences_created_ = true;
  // Send candidate allocation complete signal if we have no sequences.
  MaybeSignalCandidatesAllocationDone();
}

./p2p/base/p2p_transport_channel.cc
void P2PTransportChannel::OnCandidatesAllocationDone(
    PortAllocatorSession* session) {
  RTC_DCHECK_RUN_ON(network_thread_);
  if (config_.gather_continually()) {
    RTC_LOG(LS_INFO) << "P2PTransportChannel: " << transport_name()
                     << ", component " << component()
                     << " gathering complete, but using continual "
                        "gathering so not changing gathering state.";
    return;
  }
  gathering_state_ = kIceGatheringComplete;
  RTC_LOG(LS_INFO) << "P2PTransportChannel: " << transport_name()
                   << ", component " << component() << " gathering complete";
  SignalGatheringState(this);
}

./pc/jsep_transport_controller.cc
void JsepTransportController::OnTransportGatheringState_n(
    cricket::IceTransportInternal* transport) {
  RTC_DCHECK(network_thread_->IsCurrent());
  UpdateAggregateStates_n();
}


使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

蓝牙耳机无线高音质适用于苹果华强北pro2023年新款华为小米通用 【推荐理由】赠运费险 【券后价】89.00

Archiver|手机版|期翼嘻嘻论坛企业即时通讯综合平台 ( 京 ICP 备 10015350 )

GMT+8, 2024-4-18 21:13 , Processed in 0.222301 second(s), 10 queries .

Powered by Discuz! X2

© 2001-2011 Comsenz Inc.

回顶部