| Index: webrtc/p2p/stunprober/stunprober.cc
|
| diff --git a/webrtc/p2p/stunprober/stunprober.cc b/webrtc/p2p/stunprober/stunprober.cc
|
| index 8efe069b0b52f1841e59978a24023d087415e881..4a33444cc0545bcb7c78e90d9dc27a1ce66fcec0 100644
|
| --- a/webrtc/p2p/stunprober/stunprober.cc
|
| +++ b/webrtc/p2p/stunprober/stunprober.cc
|
| @@ -13,10 +13,15 @@
|
| #include <set>
|
| #include <string>
|
|
|
| +#include "webrtc/base/asyncpacketsocket.h"
|
| +#include "webrtc/base/asyncresolverinterface.h"
|
| #include "webrtc/base/bind.h"
|
| #include "webrtc/base/checks.h"
|
| #include "webrtc/base/helpers.h"
|
| +#include "webrtc/base/logging.h"
|
| #include "webrtc/base/timeutils.h"
|
| +#include "webrtc/base/thread.h"
|
| +#include "webrtc/p2p/base/packetsocketfactory.h"
|
| #include "webrtc/p2p/base/stun.h"
|
| #include "webrtc/p2p/stunprober/stunprober.h"
|
|
|
| @@ -24,20 +29,18 @@ namespace stunprober {
|
|
|
| namespace {
|
|
|
| +const int thread_wake_up_interval_ms = 5;
|
| +
|
| template <typename T>
|
| void IncrementCounterByAddress(std::map<T, int>* counter_per_ip, const T& ip) {
|
| counter_per_ip->insert(std::make_pair(ip, 0)).first->second++;
|
| }
|
|
|
| -bool behind_nat(NatType nat_type) {
|
| - return nat_type > stunprober::NATTYPE_NONE;
|
| -}
|
| -
|
| } // namespace
|
|
|
| // A requester tracks the requests and responses from a single socket to many
|
| // STUN servers
|
| -class StunProber::Requester {
|
| +class StunProber::Requester : public sigslot::has_slots<> {
|
| public:
|
| // Each Request maps to a request and response.
|
| struct Request {
|
| @@ -46,26 +49,20 @@ class StunProber::Requester {
|
| // Time the response was received.
|
| int64 received_time_ms = 0;
|
|
|
| - // See whether the observed address returned matches the
|
| - // local address as in StunProber.local_addr_.
|
| - bool behind_nat = false;
|
| -
|
| // Server reflexive address from STUN response for this given request.
|
| rtc::SocketAddress srflx_addr;
|
|
|
| rtc::IPAddress server_addr;
|
|
|
| int64 rtt() { return received_time_ms - sent_time_ms; }
|
| - void ProcessResponse(rtc::ByteBuffer* message,
|
| - int buf_len,
|
| - const rtc::IPAddress& local_addr);
|
| + void ProcessResponse(const char* buf, size_t buf_len);
|
| };
|
|
|
| // StunProber provides |server_ips| for Requester to probe. For shared
|
| // socket mode, it'll be all the resolved IP addresses. For non-shared mode,
|
| // it'll just be a single address.
|
| Requester(StunProber* prober,
|
| - ServerSocketInterface* socket,
|
| + rtc::AsyncPacketSocket* socket,
|
| const std::vector<rtc::SocketAddress>& server_ips);
|
| virtual ~Requester();
|
|
|
| @@ -74,11 +71,11 @@ class StunProber::Requester {
|
| // and move to the next one.
|
| void SendStunRequest();
|
|
|
| - void ReadStunResponse();
|
| -
|
| - // |result| is the positive return value from RecvFrom when data is
|
| - // available.
|
| - void OnStunResponseReceived(int result);
|
| + void OnStunResponseReceived(rtc::AsyncPacketSocket* socket,
|
| + const char* buf,
|
| + size_t size,
|
| + const rtc::SocketAddress& addr,
|
| + const rtc::PacketTime& time);
|
|
|
| const std::vector<Request*>& requests() { return requests_; }
|
|
|
| @@ -93,7 +90,7 @@ class StunProber::Requester {
|
| StunProber* prober_;
|
|
|
| // The socket for this session.
|
| - rtc::scoped_ptr<ServerSocketInterface> socket_;
|
| + rtc::scoped_ptr<rtc::AsyncPacketSocket> socket_;
|
|
|
| // Temporary SocketAddress and buffer for RecvFrom.
|
| rtc::SocketAddress addr_;
|
| @@ -111,13 +108,15 @@ class StunProber::Requester {
|
|
|
| StunProber::Requester::Requester(
|
| StunProber* prober,
|
| - ServerSocketInterface* socket,
|
| + rtc::AsyncPacketSocket* socket,
|
| const std::vector<rtc::SocketAddress>& server_ips)
|
| : prober_(prober),
|
| socket_(socket),
|
| response_packet_(new rtc::ByteBuffer(nullptr, kMaxUdpBufferSize)),
|
| server_ips_(server_ips),
|
| thread_checker_(prober->thread_checker_) {
|
| + socket_->SignalReadPacket.connect(
|
| + this, &StunProber::Requester::OnStunResponseReceived);
|
| }
|
|
|
| StunProber::Requester::~Requester() {
|
| @@ -145,7 +144,7 @@ void StunProber::Requester::SendStunRequest() {
|
| rtc::scoped_ptr<rtc::ByteBuffer> request_packet(
|
| new rtc::ByteBuffer(nullptr, kMaxUdpBufferSize));
|
| if (!message.Write(request_packet.get())) {
|
| - prober_->End(WRITE_FAILED, 0);
|
| + prober_->End(WRITE_FAILED);
|
| return;
|
| }
|
|
|
| @@ -155,48 +154,26 @@ void StunProber::Requester::SendStunRequest() {
|
| // The write must succeed immediately. Otherwise, the calculating of the STUN
|
| // request timing could become too complicated. Callback is ignored by passing
|
| // empty AsyncCallback.
|
| - int rv = socket_->SendTo(addr, const_cast<char*>(request_packet->Data()),
|
| - request_packet->Length(), AsyncCallback());
|
| + rtc::PacketOptions options;
|
| + int rv = socket_->SendTo(const_cast<char*>(request_packet->Data()),
|
| + request_packet->Length(), addr, options);
|
| if (rv < 0) {
|
| - prober_->End(WRITE_FAILED, rv);
|
| + prober_->End(WRITE_FAILED);
|
| return;
|
| }
|
|
|
| request.sent_time_ms = rtc::Time();
|
|
|
| - // Post a read waiting for response. For share mode, the subsequent read will
|
| - // be posted inside OnStunResponseReceived.
|
| - if (num_request_sent_ == 0) {
|
| - ReadStunResponse();
|
| - }
|
| -
|
| num_request_sent_++;
|
| DCHECK(static_cast<size_t>(num_request_sent_) <= server_ips_.size());
|
| }
|
|
|
| -void StunProber::Requester::ReadStunResponse() {
|
| - DCHECK(thread_checker_.CalledOnValidThread());
|
| - if (!socket_) {
|
| - return;
|
| - }
|
| -
|
| - int rv = socket_->RecvFrom(
|
| - response_packet_->ReserveWriteBuffer(kMaxUdpBufferSize),
|
| - kMaxUdpBufferSize, &addr_,
|
| - [this](int result) { this->OnStunResponseReceived(result); });
|
| - if (rv != SocketInterface::IO_PENDING) {
|
| - OnStunResponseReceived(rv);
|
| - }
|
| -}
|
| -
|
| -void StunProber::Requester::Request::ProcessResponse(
|
| - rtc::ByteBuffer* message,
|
| - int buf_len,
|
| - const rtc::IPAddress& local_addr) {
|
| +void StunProber::Requester::Request::ProcessResponse(const char* buf,
|
| + size_t buf_len) {
|
| int64 now = rtc::Time();
|
| -
|
| + rtc::ByteBuffer message(buf, buf_len);
|
| cricket::StunMessage stun_response;
|
| - if (!stun_response.Read(message)) {
|
| + if (!stun_response.Read(&message)) {
|
| // Invalid or incomplete STUN packet.
|
| received_time_ms = 0;
|
| return;
|
| @@ -218,40 +195,25 @@ void StunProber::Requester::Request::ProcessResponse(
|
| received_time_ms = now;
|
|
|
| srflx_addr = addr_attr->GetAddress();
|
| -
|
| - // Calculate behind_nat.
|
| - behind_nat = (srflx_addr.ipaddr() != local_addr);
|
| }
|
|
|
| -void StunProber::Requester::OnStunResponseReceived(int result) {
|
| +void StunProber::Requester::OnStunResponseReceived(
|
| + rtc::AsyncPacketSocket* socket,
|
| + const char* buf,
|
| + size_t size,
|
| + const rtc::SocketAddress& addr,
|
| + const rtc::PacketTime& time) {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
| DCHECK(socket_);
|
| -
|
| - if (result < 0) {
|
| - // Something is wrong, finish the test.
|
| - prober_->End(READ_FAILED, result);
|
| - return;
|
| - }
|
| -
|
| - Request* request = GetRequestByAddress(addr_.ipaddr());
|
| + Request* request = GetRequestByAddress(addr.ipaddr());
|
| if (!request) {
|
| // Something is wrong, finish the test.
|
| - prober_->End(GENERIC_FAILURE, result);
|
| + prober_->End(GENERIC_FAILURE);
|
| return;
|
| }
|
|
|
| num_response_received_++;
|
| -
|
| - // Resize will set the end_ to indicate that there are data available in this
|
| - // ByteBuffer.
|
| - response_packet_->Resize(result);
|
| - request->ProcessResponse(response_packet_.get(), result,
|
| - prober_->local_addr_);
|
| -
|
| - if (static_cast<size_t>(num_response_received_) < server_ips_.size()) {
|
| - // Post another read response.
|
| - ReadStunResponse();
|
| - }
|
| + request->ProcessResponse(buf, size);
|
| }
|
|
|
| StunProber::Requester::Request* StunProber::Requester::GetRequestByAddress(
|
| @@ -266,13 +228,13 @@ StunProber::Requester::Request* StunProber::Requester::GetRequestByAddress(
|
| return nullptr;
|
| }
|
|
|
| -StunProber::StunProber(HostNameResolverInterface* host_name_resolver,
|
| - SocketFactoryInterface* socket_factory,
|
| - TaskRunnerInterface* task_runner)
|
| +StunProber::StunProber(rtc::PacketSocketFactory* socket_factory,
|
| + rtc::Thread* thread,
|
| + const rtc::NetworkManager::NetworkList& networks)
|
| : interval_ms_(0),
|
| socket_factory_(socket_factory),
|
| - resolver_(host_name_resolver),
|
| - task_runner_(task_runner) {
|
| + thread_(thread),
|
| + networks_(networks) {
|
| }
|
|
|
| StunProber::~StunProber() {
|
| @@ -281,6 +243,11 @@ StunProber::~StunProber() {
|
| delete req;
|
| }
|
| }
|
| + for (auto s : sockets_) {
|
| + if (s) {
|
| + delete s;
|
| + }
|
| + }
|
| }
|
|
|
| bool StunProber::Start(const std::vector<rtc::SocketAddress>& servers,
|
| @@ -301,92 +268,98 @@ bool StunProber::Start(const std::vector<rtc::SocketAddress>& servers,
|
| timeout_ms_ = timeout_ms;
|
| servers_ = servers;
|
| finished_callback_ = callback;
|
| - resolver_->Resolve(servers_[0], &resolved_ips_,
|
| - [this](int result) { this->OnServerResolved(0, result); });
|
| + return ResolveServerName(servers_.back());
|
| +}
|
| +
|
| +bool StunProber::ResolveServerName(const rtc::SocketAddress& addr) {
|
| + rtc::AsyncResolverInterface* resolver =
|
| + socket_factory_->CreateAsyncResolver();
|
| + if (!resolver) {
|
| + return false;
|
| + }
|
| + resolver->SignalDone.connect(this, &StunProber::OnServerResolved);
|
| + resolver->Start(addr);
|
| return true;
|
| }
|
|
|
| -void StunProber::OnServerResolved(int index, int result) {
|
| +void StunProber::OnSocketReady(rtc::AsyncPacketSocket* socket,
|
| + const rtc::SocketAddress& addr) {
|
| + total_ready_sockets_++;
|
| + if (total_ready_sockets_ == total_socket_required()) {
|
| + MaybeScheduleStunRequests();
|
| + }
|
| +}
|
| +
|
| +void StunProber::OnServerResolved(rtc::AsyncResolverInterface* resolver) {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
|
|
| - if (result == 0) {
|
| - all_servers_ips_.insert(all_servers_ips_.end(), resolved_ips_.begin(),
|
| - resolved_ips_.end());
|
| - resolved_ips_.clear();
|
| + if (resolver->GetError() == 0) {
|
| + rtc::SocketAddress addr(resolver->address().ipaddr(),
|
| + resolver->address().port());
|
| + all_servers_addrs_.push_back(addr);
|
| }
|
|
|
| - index++;
|
| + // Deletion of AsyncResolverInterface can't be done in OnResolveResult which
|
| + // handles SignalDone.
|
| + invoker_.AsyncInvoke<void>(
|
| + thread_,
|
| + rtc::Bind(&rtc::AsyncResolverInterface::Destroy, resolver, false));
|
| + servers_.pop_back();
|
|
|
| - if (static_cast<size_t>(index) < servers_.size()) {
|
| - resolver_->Resolve(
|
| - servers_[index], &resolved_ips_,
|
| - [this, index](int result) { this->OnServerResolved(index, result); });
|
| + if (servers_.size()) {
|
| + if (!ResolveServerName(servers_.back())) {
|
| + End(RESOLVE_FAILED);
|
| + }
|
| return;
|
| }
|
|
|
| - if (all_servers_ips_.size() == 0) {
|
| - End(RESOLVE_FAILED, result);
|
| + if (all_servers_addrs_.size() == 0) {
|
| + End(RESOLVE_FAILED);
|
| return;
|
| }
|
|
|
| // Dedupe.
|
| - std::set<rtc::SocketAddress> addrs(all_servers_ips_.begin(),
|
| - all_servers_ips_.end());
|
| - all_servers_ips_.assign(addrs.begin(), addrs.end());
|
| -
|
| - rtc::IPAddress addr;
|
| - if (GetLocalAddress(&addr) != 0) {
|
| - End(GENERIC_FAILURE, result);
|
| - return;
|
| - }
|
| -
|
| - socket_factory_->Prepare(GetTotalClientSockets(), GetTotalServerSockets(),
|
| - [this](int result) {
|
| - if (result == 0) {
|
| - this->MaybeScheduleStunRequests();
|
| - }
|
| - });
|
| -}
|
| -
|
| -int StunProber::GetLocalAddress(rtc::IPAddress* addr) {
|
| - DCHECK(thread_checker_.CalledOnValidThread());
|
| - if (local_addr_.family() == AF_UNSPEC) {
|
| - rtc::SocketAddress sock_addr;
|
| - rtc::scoped_ptr<ClientSocketInterface> socket(
|
| - socket_factory_->CreateClientSocket());
|
| - int rv = socket->Connect(all_servers_ips_[0]);
|
| - if (rv != SUCCESS) {
|
| - End(GENERIC_FAILURE, rv);
|
| - return rv;
|
| + std::set<rtc::SocketAddress> addrs(all_servers_addrs_.begin(),
|
| + all_servers_addrs_.end());
|
| + all_servers_addrs_.assign(addrs.begin(), addrs.end());
|
| +
|
| + // Prepare all the sockets beforehand. All of them will bind to "any" address.
|
| + while (sockets_.size() < total_socket_required()) {
|
| + rtc::scoped_ptr<rtc::AsyncPacketSocket> socket(
|
| + socket_factory_->CreateUdpSocket(rtc::SocketAddress(INADDR_ANY, 0), 0,
|
| + 0));
|
| + if (!socket) {
|
| + End(GENERIC_FAILURE);
|
| + return;
|
| }
|
| - rv = socket->GetLocalAddress(&sock_addr);
|
| - if (rv != SUCCESS) {
|
| - End(GENERIC_FAILURE, rv);
|
| - return rv;
|
| + // Chrome and WebRTC behave differently in terms of the state of a socket
|
| + // once returned from PacketSocketFactory::CreateUdpSocket.
|
| + if (socket->GetState() == rtc::AsyncPacketSocket::STATE_BINDING) {
|
| + socket->SignalAddressReady.connect(this, &StunProber::OnSocketReady);
|
| + } else {
|
| + OnSocketReady(socket.get(), rtc::SocketAddress(INADDR_ANY, 0));
|
| }
|
| - local_addr_ = sock_addr.ipaddr();
|
| - socket->Close();
|
| + sockets_.push_back(socket.release());
|
| }
|
| - *addr = local_addr_;
|
| - return 0;
|
| }
|
|
|
| StunProber::Requester* StunProber::CreateRequester() {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
| - rtc::scoped_ptr<ServerSocketInterface> socket(
|
| - socket_factory_->CreateServerSocket(kMaxUdpBufferSize,
|
| - kMaxUdpBufferSize));
|
| - if (!socket) {
|
| + if (!sockets_.size()) {
|
| return nullptr;
|
| }
|
| + StunProber::Requester* requester;
|
| if (shared_socket_mode_) {
|
| - return new Requester(this, socket.release(), all_servers_ips_);
|
| + requester = new Requester(this, sockets_.back(), all_servers_addrs_);
|
| } else {
|
| std::vector<rtc::SocketAddress> server_ip;
|
| server_ip.push_back(
|
| - all_servers_ips_[(num_request_sent_ % all_servers_ips_.size())]);
|
| - return new Requester(this, socket.release(), server_ip);
|
| + all_servers_addrs_[(num_request_sent_ % all_servers_addrs_.size())]);
|
| + requester = new Requester(this, sockets_.back(), server_ip);
|
| }
|
| +
|
| + sockets_.pop_back();
|
| + return requester;
|
| }
|
|
|
| bool StunProber::SendNextRequest() {
|
| @@ -407,19 +380,20 @@ void StunProber::MaybeScheduleStunRequests() {
|
| uint32 now = rtc::Time();
|
|
|
| if (Done()) {
|
| - task_runner_->PostTask(rtc::Bind(&StunProber::End, this, SUCCESS, 0),
|
| - timeout_ms_);
|
| + invoker_.AsyncInvokeDelayed<void>(
|
| + thread_, rtc::Bind(&StunProber::End, this, SUCCESS), timeout_ms_);
|
| return;
|
| }
|
| - if (now >= next_request_time_ms_) {
|
| + if ((now + (thread_wake_up_interval_ms / 2)) >= next_request_time_ms_) {
|
| if (!SendNextRequest()) {
|
| - End(GENERIC_FAILURE, 0);
|
| + End(GENERIC_FAILURE);
|
| return;
|
| }
|
| next_request_time_ms_ = now + interval_ms_;
|
| }
|
| - task_runner_->PostTask(
|
| - rtc::Bind(&StunProber::MaybeScheduleStunRequests, this), 1 /* ms */);
|
| + invoker_.AsyncInvokeDelayed<void>(
|
| + thread_, rtc::Bind(&StunProber::MaybeScheduleStunRequests, this),
|
| + thread_wake_up_interval_ms /* ms */);
|
| }
|
|
|
| bool StunProber::GetStats(StunProber::Stats* prob_stats) const {
|
| @@ -464,14 +438,7 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const {
|
| IncrementCounterByAddress(&num_response_per_server, request->server_addr);
|
| IncrementCounterByAddress(&num_response_per_srflx_addr,
|
| request->srflx_addr);
|
| -
|
| rtt_sum += request->rtt();
|
| - if (nat_type == NATTYPE_INVALID) {
|
| - nat_type = request->behind_nat ? NATTYPE_UNKNOWN : NATTYPE_NONE;
|
| - } else if (behind_nat(nat_type) != request->behind_nat) {
|
| - // Detect the inconsistency in NAT presence.
|
| - return false;
|
| - }
|
| stats.srflx_addrs.insert(request->srflx_addr.ToString());
|
| srflx_ips.insert(request->srflx_addr.ipaddr());
|
| }
|
| @@ -505,18 +472,34 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const {
|
| return false;
|
| }
|
|
|
| - stats.nat_type = nat_type;
|
| -
|
| // Shared mode is only true if we use the shared socket and there are more
|
| // than 1 responding servers.
|
| stats.shared_socket_mode =
|
| shared_socket_mode_ && (num_server_ip_with_response > 1);
|
|
|
| - if (stats.shared_socket_mode && nat_type == NATTYPE_UNKNOWN) {
|
| - stats.nat_type = NATTYPE_NON_SYMMETRIC;
|
| + if (stats.shared_socket_mode && nat_type == NATTYPE_INVALID) {
|
| + nat_type = NATTYPE_NON_SYMMETRIC;
|
| }
|
|
|
| - stats.host_ip = local_addr_.ToString();
|
| + // If we could find a local IP matching srflx, we're not behind a NAT.
|
| + rtc::SocketAddress srflx_addr;
|
| + if (!srflx_addr.FromString(*(stats.srflx_addrs.begin()))) {
|
| + return false;
|
| + }
|
| + for (const auto& net : networks_) {
|
| + if (srflx_addr.ipaddr() == net->GetBestIP()) {
|
| + nat_type = stunprober::NATTYPE_NONE;
|
| + stats.host_ip = net->GetBestIP().ToString();
|
| + break;
|
| + }
|
| + }
|
| +
|
| + // Finally, we know we're behind a NAT but can't determine which type it is.
|
| + if (nat_type == NATTYPE_INVALID) {
|
| + nat_type = NATTYPE_UNKNOWN;
|
| + }
|
| +
|
| + stats.nat_type = nat_type;
|
| stats.num_request_sent = num_sent;
|
| stats.num_response_received = num_received;
|
| stats.target_request_interval_ns = interval_ms_ * 1000;
|
| @@ -538,14 +521,14 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const {
|
| return true;
|
| }
|
|
|
| -void StunProber::End(StunProber::Status status, int result) {
|
| +void StunProber::End(StunProber::Status status) {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
| if (!finished_callback_.empty()) {
|
| AsyncCallback callback = finished_callback_;
|
| finished_callback_ = AsyncCallback();
|
|
|
| // Callback at the last since the prober might be deleted in the callback.
|
| - callback(status);
|
| + callback(this, status);
|
| }
|
| }
|
|
|
|
|