// Audio, Video, and FlexFEC receive streams are owned by the client that
// creates them.
包含 std::set<AudioReceiveStream*> audio_receive_streams_ RTC_GUARDED_BY(receive_crit_);
包含 std::set<VideoReceiveStream*> video_receive_streams_ RTC_GUARDED_BY(receive_crit_);
包含 std::map<std::string, AudioReceiveStream*> sync_stream_mapping_ RTC_GUARDED_BY(receive_crit_);
// TODO(nisse): Should eventually be injected at creation,
// with a single object in the bundled case.
包含 RtpStreamReceiverController audio_receiver_controller_;
包含 RtpStreamReceiverController video_receiver_controller_;
包含 std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_ RTC_GUARDED_BY(receive_crit_);
包含 std::unique_ptr<RWLockWrapper> send_crit_;
// Audio and Video send streams are owned by the client that creates them.
包含 std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_ RTC_GUARDED_BY(send_crit_);
包含 std::map<uint32_t, VideoSendStream*> video_send_ssrcs_ RTC_GUARDED_BY(send_crit_);
-------------------------------------------------------------------------------------------------------
包含 std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(send_crit_);
这个 Call::CreateVideoSendStream 接口内
VideoSendStream* send_stream = new VideoSendStream
video_send_streams_.insert(send_stream);
-------------------------------------------------------------------------------------------------------
包含 RtpStateMap suspended_audio_send_ssrcs_ RTC_GUARDED_BY(configuration_sequence_checker_);
包含 RtpStateMap suspended_video_send_ssrcs_ RTC_GUARDED_BY(configuration_sequence_checker_);
包含 RtpPayloadStateMap suspended_video_payload_states_ RTC_GUARDED_BY(configuration_sequence_checker_);
包含 webrtc::RtcEventLog* event_log_;
// The following members are only accessed (exclusively) from one thread and
// from the destructor, and therefore doesn't need any explicit
// synchronization.
包含 RateCounter received_bytes_per_second_counter_;
包含 RateCounter received_audio_bytes_per_second_counter_;
包含 RateCounter received_video_bytes_per_second_counter_;
包含 RateCounter received_rtcp_bytes_per_second_counter_;
包含 absl:ptional<int64_t> first_received_rtp_audio_ms_;
包含 absl:ptional<int64_t> last_received_rtp_audio_ms_;
包含 absl:ptional<int64_t> first_received_rtp_video_ms_;
包含 absl:ptional<int64_t> last_received_rtp_video_ms_;
包含 rtc::CriticalSection last_bandwidth_bps_crit_;
包含 uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(&last_bandwidth_bps_crit_);
// TODO(holmer): Remove this lock once BitrateController no longer calls
// OnNetworkChanged from multiple threads.
包含 rtc::CriticalSection bitrate_crit_;
包含 uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(&worker_sequence_checker_);
包含 uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(&bitrate_crit_);
包含 AvgCounter estimated_send_bitrate_kbps_counter_ RTC_GUARDED_BY(&bitrate_crit_);
包含 AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(&bitrate_crit_);
包含 ReceiveSideCongestionController receive_side_cc_;
包含 const std::unique_ptr<ReceiveTimeCalculator> receive_time_calculator_;
包含 const std::unique_ptr<SendDelayStats> video_send_delay_stats_;
包含 const int64_t start_ms_;
// Caches transport_send_.get(), to avoid racing with destructor.
// Note that this is declared before transport_send_ to ensure that it is not
// invalidated until no more tasks can be running on the transport_send_ task
// queue.
------------------------------------------------------------------------
包含 RtpTransportControllerSendInterface* const transport_send_ptr_;
// Declared last since it will issue callbacks from a task queue. Declaring it
// last ensures that it is destroyed first and any running tasks are finished.
包含 std::unique_ptr<RtpTransportControllerSendInterface> transport_send_;
这个就是 RtpTransportControllerSend 对象
----------------------------------------------------------------------
包含 bool is_target_rate_observer_registered_ RTC_GUARDED_BY(&configuration_sequence_checker_) = false;
7.
VideoBroadcaster::OnFrame(const webrtc::VideoFrame& frame)
if (sink_pair.wants.black_frames) {
webrtc::VideoFrame black_frame = webrtc::VideoFrame::Builder()
.set_video_frame_buffer(GetBlackFrameBuffer(frame.width(), frame.height()))
.set_rotation(frame.rotation())
.set_timestamp_us(frame.timestamp_us())
.set_id(frame.id())
.build();
sink_pair.sink->OnFrame(black_frame);
} else if (!previous_frame_sent_to_all_sinks_ && frame.has_update_rect()) {
// Since last frame was not sent to some sinks, no reliable update
// information is available, so we need to clear the update rect.
webrtc::VideoFrame copy = frame;
copy.clear_update_rect();
sink_pair.sink->OnFrame(copy);
} else {
sink_pair.sink->OnFrame(frame);
}
}
7.1
sink_pairs() 中的 sinks_ 是由 AddOrUpdateSink 添加的 VideoStreamEncoder,具体流程参见《视频编码器的创建以及与源的关联》 11.5
这里的 transport 就是 RtpTransportControllerSend 这里产生一个 RtpVideoSender 对象
RtpTransportControllerSend::CreateRtpVideoSender
video_rtp_senders_.push_back(std::make_unique<RtpVideoSender>(clock_, suspended_ssrcs, states,
rtp_config, rtcp_report_interval_ms, send_transport, observers,
// TODO(holmer): Remove this circular dependency by injecting
// the parts of RtpTransportControllerSendInterface that are really used.
this, event_log, &retransmission_rate_limiter_, std::move(fec_controller),
frame_encryption_config.frame_encryptor, frame_encryption_config.crypto_options));
struct RtpSenderContext {
explicit RtpSenderContext(const RtpRtcp::Configuration& config);
// Storage of packets, for retransmissions and padding, if applicable.
RtpPacketHistory packet_history;
// Handles final time timestamping/stats/etc and handover to Transport.
RtpSenderEgress packet_sender;
// If no paced sender configured, this class will be used to pass packets
// from |packet_generator_| to |packet_sender_|.
RtpSenderEgress::NonPacedPacketSender non_paced_sender;
// Handles creation of RTP packets to be sent.
RTPSender packet_generator;
};
上述流程分析后我们得出下一步要进入 RTPSender::EnqueuePackets 函数了
10.
./modules/rtp_rtcp/source/rtp_sender.cc
上述步骤把发送的数据放到 RTPSender 的队列里了
void RTPSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
RTC_DCHECK(!packets.empty());
int64_t now_ms = clock_->TimeInMilliseconds();
for (auto& packet : packets) {
RTC_DCHECK(packet);
RTC_CHECK(packet->packet_type().has_value())
<< "acket type must be set before sending.";
if (packet->capture_time_ms() <= 0) {
packet->set_capture_time_ms(now_ms);
}
}
12.
void PacedSender::MaybeWakupProcessThread() {
// Tell the process thread to call our TimeUntilNextProcess() method to get
// a new time for when to call Process().
if (process_thread_ &&
process_mode_ == PacingController:rocessMode::kDynamic) {
process_thread_->WakeUp(&module_proxy_);
}
}
16.
./modules/pacing/packet_router.cc
void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
rtc::CritScope cs(&modules_crit_);
// With the new pacer code path, transport sequence numbers are only set here,
// on the pacer thread. Therefore we don't need atomics/synchronization.
if (packet->IsExtensionReserved<TransportSequenceNumber>()) {
packet->SetExtension<TransportSequenceNumber>((++transport_seq_) & 0xFFFF);
}
uint32_t ssrc = packet->Ssrc();
auto kv = send_modules_map_.find(ssrc);
if (kv == send_modules_map_.end()) {
RTC_LOG(LS_WARNING)
<< "Failed to send packet, matching RTP module not found "
"or transport error. SSRC = "
<< packet->Ssrc() << ", sequence number " << packet->SequenceNumber();
return;
}
RtpRtcp* rtp_module = kv->second.first;
if (!rtp_module->TrySendPacket(packet.get(), cluster_info)) {
RTC_LOG(LS_WARNING) << "Failed to send packet, rejected by RTP module.";
return;
}
if (rtp_module->SupportsRtxPayloadPadding()) {
// This is now the last module to send media, and has the desired
// properties needed for payload based padding. Cache it for later use.
last_send_module_ = rtp_module;
}
}
// Both RTP and RTCP channels should be set, we can call SetInterface on
// the media channel and it can set network options.
media_channel_->SetInterface(this, media_transport_config);
}
26.
./p2p/base/dtls_transport.cc
int DtlsTransport::SendPacket(const char* data,
size_t size,
const rtc::PacketOptions& options,
int flags)
if (!dtls_active_) {
// Not doing DTLS.
return ice_transport_->SendPacket(data, size, options);
}
switch (dtls_state()) {
case DTLS_TRANSPORT_NEW:
// Can't send data until the connection is active.
// TODO(ekr@rtfm.com): assert here if dtls_ is NULL?
return -1;
case DTLS_TRANSPORT_CONNECTING:
// Can't send data until the connection is active.
return -1;
case DTLS_TRANSPORT_CONNECTED:
if (flags & PF_SRTP_BYPASS) {
RTC_DCHECK(!srtp_ciphers_.empty());
if (!IsRtpPacket(data, size)) {
return -1;
}
27.
./p2p/base/p2p_transport_channel.cc
int P2PTransportChannel::SendPacket(const char* data,
size_t len,
const rtc::PacketOptions& options,
int flags) {
RTC_DCHECK_RUN_ON(network_thread_);
if (flags != 0) {
error_ = EINVAL;
return -1;
}
// If we don't think the connection is working yet, return ENOTCONN
// instead of sending a packet that will probably be dropped.
if (!ReadyToSend(selected_connection_)) {
error_ = ENOTCONN;
return -1;
}
last_sent_packet_id_ = options.packet_id;
rtc::PacketOptions modified_options(options);
modified_options.info_signaled_after_sent.packet_type =
rtc::PacketType::kData;
int sent = selected_connection_->Send(data, len, modified_options);
if (sent <= 0) {
RTC_DCHECK(sent < 0);
error_ = selected_connection_->GetError();
}
return sent;
}
33.
./rtc_base/physical_socket_server.cc
调用 SocketDispatcher 的基类函数
int PhysicalSocket::Send(const void* pv, size_t cb)
int sent = DoSend(
s_, reinterpret_cast<const char*>(pv), static_cast<int>(cb),
#if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
// Suppress SIGPIPE. Without this, attempting to send on a socket whose
// other end is closed will result in a SIGPIPE signal being raised to
// our process, which by default will terminate the process, which we
// don't want. By specifying this flag, we'll just get the error EPIPE
// instead and can handle the error gracefully.
MSG_NOSIGNAL
#else
0
#endif
);
34. 到这里就彻底发送出去了,调用了系统的 send 函数。。。。。。
int PhysicalSocket::DoSend(SOCKET socket, const char* buf, int len, int flags) {
return ::send(socket, buf, len, flags);
}
这个 encoder_ 目前就是 VideoEncoderWrapper,这个是上层的一个硬编码代理,并没有实现
接口
// Inform the encoder when the packet loss rate changes.
//
// Input: - packet_loss_rate : The packet loss rate (0.0 to 1.0).
virtual void OnPacketLossRateUpdate(float packet_loss_rate);
// Inform the encoder when the round trip time changes.
//
// Input: - rtt_ms : The new RTT, in milliseconds.
virtual void OnRttUpdate(int64_t rtt_ms);