Index: webrtc/p2p/stunprober/stunprober.cc |
diff --git a/webrtc/p2p/stunprober/stunprober.cc b/webrtc/p2p/stunprober/stunprober.cc |
index 8efe069b0b52f1841e59978a24023d087415e881..4a33444cc0545bcb7c78e90d9dc27a1ce66fcec0 100644 |
--- a/webrtc/p2p/stunprober/stunprober.cc |
+++ b/webrtc/p2p/stunprober/stunprober.cc |
@@ -13,10 +13,15 @@ |
#include <set> |
#include <string> |
+#include "webrtc/base/asyncpacketsocket.h" |
+#include "webrtc/base/asyncresolverinterface.h" |
#include "webrtc/base/bind.h" |
#include "webrtc/base/checks.h" |
#include "webrtc/base/helpers.h" |
+#include "webrtc/base/logging.h" |
#include "webrtc/base/timeutils.h" |
+#include "webrtc/base/thread.h" |
+#include "webrtc/p2p/base/packetsocketfactory.h" |
#include "webrtc/p2p/base/stun.h" |
#include "webrtc/p2p/stunprober/stunprober.h" |
@@ -24,20 +29,18 @@ namespace stunprober { |
namespace { |
+const int thread_wake_up_interval_ms = 5; |
+ |
template <typename T> |
void IncrementCounterByAddress(std::map<T, int>* counter_per_ip, const T& ip) { |
counter_per_ip->insert(std::make_pair(ip, 0)).first->second++; |
} |
-bool behind_nat(NatType nat_type) { |
- return nat_type > stunprober::NATTYPE_NONE; |
-} |
- |
} // namespace |
// A requester tracks the requests and responses from a single socket to many |
// STUN servers |
-class StunProber::Requester { |
+class StunProber::Requester : public sigslot::has_slots<> { |
public: |
// Each Request maps to a request and response. |
struct Request { |
@@ -46,26 +49,20 @@ class StunProber::Requester { |
// Time the response was received. |
int64 received_time_ms = 0; |
- // See whether the observed address returned matches the |
- // local address as in StunProber.local_addr_. |
- bool behind_nat = false; |
- |
// Server reflexive address from STUN response for this given request. |
rtc::SocketAddress srflx_addr; |
rtc::IPAddress server_addr; |
int64 rtt() { return received_time_ms - sent_time_ms; } |
- void ProcessResponse(rtc::ByteBuffer* message, |
- int buf_len, |
- const rtc::IPAddress& local_addr); |
+ void ProcessResponse(const char* buf, size_t buf_len); |
}; |
// StunProber provides |server_ips| for Requester to probe. For shared |
// socket mode, it'll be all the resolved IP addresses. For non-shared mode, |
// it'll just be a single address. |
Requester(StunProber* prober, |
- ServerSocketInterface* socket, |
+ rtc::AsyncPacketSocket* socket, |
const std::vector<rtc::SocketAddress>& server_ips); |
virtual ~Requester(); |
@@ -74,11 +71,11 @@ class StunProber::Requester { |
// and move to the next one. |
void SendStunRequest(); |
- void ReadStunResponse(); |
- |
- // |result| is the positive return value from RecvFrom when data is |
- // available. |
- void OnStunResponseReceived(int result); |
+ void OnStunResponseReceived(rtc::AsyncPacketSocket* socket, |
+ const char* buf, |
+ size_t size, |
+ const rtc::SocketAddress& addr, |
+ const rtc::PacketTime& time); |
const std::vector<Request*>& requests() { return requests_; } |
@@ -93,7 +90,7 @@ class StunProber::Requester { |
StunProber* prober_; |
// The socket for this session. |
- rtc::scoped_ptr<ServerSocketInterface> socket_; |
+ rtc::scoped_ptr<rtc::AsyncPacketSocket> socket_; |
// Temporary SocketAddress and buffer for RecvFrom. |
rtc::SocketAddress addr_; |
@@ -111,13 +108,15 @@ class StunProber::Requester { |
StunProber::Requester::Requester( |
StunProber* prober, |
- ServerSocketInterface* socket, |
+ rtc::AsyncPacketSocket* socket, |
const std::vector<rtc::SocketAddress>& server_ips) |
: prober_(prober), |
socket_(socket), |
response_packet_(new rtc::ByteBuffer(nullptr, kMaxUdpBufferSize)), |
server_ips_(server_ips), |
thread_checker_(prober->thread_checker_) { |
+ socket_->SignalReadPacket.connect( |
+ this, &StunProber::Requester::OnStunResponseReceived); |
} |
StunProber::Requester::~Requester() { |
@@ -145,7 +144,7 @@ void StunProber::Requester::SendStunRequest() { |
rtc::scoped_ptr<rtc::ByteBuffer> request_packet( |
new rtc::ByteBuffer(nullptr, kMaxUdpBufferSize)); |
if (!message.Write(request_packet.get())) { |
- prober_->End(WRITE_FAILED, 0); |
+ prober_->End(WRITE_FAILED); |
return; |
} |
@@ -155,48 +154,26 @@ void StunProber::Requester::SendStunRequest() { |
// The write must succeed immediately. Otherwise, the calculating of the STUN |
// request timing could become too complicated. Callback is ignored by passing |
// empty AsyncCallback. |
- int rv = socket_->SendTo(addr, const_cast<char*>(request_packet->Data()), |
- request_packet->Length(), AsyncCallback()); |
+ rtc::PacketOptions options; |
+ int rv = socket_->SendTo(const_cast<char*>(request_packet->Data()), |
+ request_packet->Length(), addr, options); |
if (rv < 0) { |
- prober_->End(WRITE_FAILED, rv); |
+ prober_->End(WRITE_FAILED); |
return; |
} |
request.sent_time_ms = rtc::Time(); |
- // Post a read waiting for response. For share mode, the subsequent read will |
- // be posted inside OnStunResponseReceived. |
- if (num_request_sent_ == 0) { |
- ReadStunResponse(); |
- } |
- |
num_request_sent_++; |
DCHECK(static_cast<size_t>(num_request_sent_) <= server_ips_.size()); |
} |
-void StunProber::Requester::ReadStunResponse() { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
- if (!socket_) { |
- return; |
- } |
- |
- int rv = socket_->RecvFrom( |
- response_packet_->ReserveWriteBuffer(kMaxUdpBufferSize), |
- kMaxUdpBufferSize, &addr_, |
- [this](int result) { this->OnStunResponseReceived(result); }); |
- if (rv != SocketInterface::IO_PENDING) { |
- OnStunResponseReceived(rv); |
- } |
-} |
- |
-void StunProber::Requester::Request::ProcessResponse( |
- rtc::ByteBuffer* message, |
- int buf_len, |
- const rtc::IPAddress& local_addr) { |
+void StunProber::Requester::Request::ProcessResponse(const char* buf, |
+ size_t buf_len) { |
int64 now = rtc::Time(); |
- |
+ rtc::ByteBuffer message(buf, buf_len); |
cricket::StunMessage stun_response; |
- if (!stun_response.Read(message)) { |
+ if (!stun_response.Read(&message)) { |
// Invalid or incomplete STUN packet. |
received_time_ms = 0; |
return; |
@@ -218,40 +195,25 @@ void StunProber::Requester::Request::ProcessResponse( |
received_time_ms = now; |
srflx_addr = addr_attr->GetAddress(); |
- |
- // Calculate behind_nat. |
- behind_nat = (srflx_addr.ipaddr() != local_addr); |
} |
-void StunProber::Requester::OnStunResponseReceived(int result) { |
+void StunProber::Requester::OnStunResponseReceived( |
+ rtc::AsyncPacketSocket* socket, |
+ const char* buf, |
+ size_t size, |
+ const rtc::SocketAddress& addr, |
+ const rtc::PacketTime& time) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
DCHECK(socket_); |
- |
- if (result < 0) { |
- // Something is wrong, finish the test. |
- prober_->End(READ_FAILED, result); |
- return; |
- } |
- |
- Request* request = GetRequestByAddress(addr_.ipaddr()); |
+ Request* request = GetRequestByAddress(addr.ipaddr()); |
if (!request) { |
// Something is wrong, finish the test. |
- prober_->End(GENERIC_FAILURE, result); |
+ prober_->End(GENERIC_FAILURE); |
return; |
} |
num_response_received_++; |
- |
- // Resize will set the end_ to indicate that there are data available in this |
- // ByteBuffer. |
- response_packet_->Resize(result); |
- request->ProcessResponse(response_packet_.get(), result, |
- prober_->local_addr_); |
- |
- if (static_cast<size_t>(num_response_received_) < server_ips_.size()) { |
- // Post another read response. |
- ReadStunResponse(); |
- } |
+ request->ProcessResponse(buf, size); |
} |
StunProber::Requester::Request* StunProber::Requester::GetRequestByAddress( |
@@ -266,13 +228,13 @@ StunProber::Requester::Request* StunProber::Requester::GetRequestByAddress( |
return nullptr; |
} |
-StunProber::StunProber(HostNameResolverInterface* host_name_resolver, |
- SocketFactoryInterface* socket_factory, |
- TaskRunnerInterface* task_runner) |
+StunProber::StunProber(rtc::PacketSocketFactory* socket_factory, |
+ rtc::Thread* thread, |
+ const rtc::NetworkManager::NetworkList& networks) |
: interval_ms_(0), |
socket_factory_(socket_factory), |
- resolver_(host_name_resolver), |
- task_runner_(task_runner) { |
+ thread_(thread), |
+ networks_(networks) { |
} |
StunProber::~StunProber() { |
@@ -281,6 +243,11 @@ StunProber::~StunProber() { |
delete req; |
} |
} |
+ for (auto s : sockets_) { |
+ if (s) { |
+ delete s; |
+ } |
+ } |
} |
bool StunProber::Start(const std::vector<rtc::SocketAddress>& servers, |
@@ -301,92 +268,98 @@ bool StunProber::Start(const std::vector<rtc::SocketAddress>& servers, |
timeout_ms_ = timeout_ms; |
servers_ = servers; |
finished_callback_ = callback; |
- resolver_->Resolve(servers_[0], &resolved_ips_, |
- [this](int result) { this->OnServerResolved(0, result); }); |
+ return ResolveServerName(servers_.back()); |
+} |
+ |
+bool StunProber::ResolveServerName(const rtc::SocketAddress& addr) { |
+ rtc::AsyncResolverInterface* resolver = |
+ socket_factory_->CreateAsyncResolver(); |
+ if (!resolver) { |
+ return false; |
+ } |
+ resolver->SignalDone.connect(this, &StunProber::OnServerResolved); |
+ resolver->Start(addr); |
return true; |
} |
-void StunProber::OnServerResolved(int index, int result) { |
+void StunProber::OnSocketReady(rtc::AsyncPacketSocket* socket, |
+ const rtc::SocketAddress& addr) { |
+ total_ready_sockets_++; |
+ if (total_ready_sockets_ == total_socket_required()) { |
+ MaybeScheduleStunRequests(); |
+ } |
+} |
+ |
+void StunProber::OnServerResolved(rtc::AsyncResolverInterface* resolver) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
- if (result == 0) { |
- all_servers_ips_.insert(all_servers_ips_.end(), resolved_ips_.begin(), |
- resolved_ips_.end()); |
- resolved_ips_.clear(); |
+ if (resolver->GetError() == 0) { |
+ rtc::SocketAddress addr(resolver->address().ipaddr(), |
+ resolver->address().port()); |
+ all_servers_addrs_.push_back(addr); |
} |
- index++; |
+ // Deletion of AsyncResolverInterface can't be done in OnResolveResult which |
+ // handles SignalDone. |
+ invoker_.AsyncInvoke<void>( |
+ thread_, |
+ rtc::Bind(&rtc::AsyncResolverInterface::Destroy, resolver, false)); |
+ servers_.pop_back(); |
- if (static_cast<size_t>(index) < servers_.size()) { |
- resolver_->Resolve( |
- servers_[index], &resolved_ips_, |
- [this, index](int result) { this->OnServerResolved(index, result); }); |
+ if (servers_.size()) { |
+ if (!ResolveServerName(servers_.back())) { |
+ End(RESOLVE_FAILED); |
+ } |
return; |
} |
- if (all_servers_ips_.size() == 0) { |
- End(RESOLVE_FAILED, result); |
+ if (all_servers_addrs_.size() == 0) { |
+ End(RESOLVE_FAILED); |
return; |
} |
// Dedupe. |
- std::set<rtc::SocketAddress> addrs(all_servers_ips_.begin(), |
- all_servers_ips_.end()); |
- all_servers_ips_.assign(addrs.begin(), addrs.end()); |
- |
- rtc::IPAddress addr; |
- if (GetLocalAddress(&addr) != 0) { |
- End(GENERIC_FAILURE, result); |
- return; |
- } |
- |
- socket_factory_->Prepare(GetTotalClientSockets(), GetTotalServerSockets(), |
- [this](int result) { |
- if (result == 0) { |
- this->MaybeScheduleStunRequests(); |
- } |
- }); |
-} |
- |
-int StunProber::GetLocalAddress(rtc::IPAddress* addr) { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
- if (local_addr_.family() == AF_UNSPEC) { |
- rtc::SocketAddress sock_addr; |
- rtc::scoped_ptr<ClientSocketInterface> socket( |
- socket_factory_->CreateClientSocket()); |
- int rv = socket->Connect(all_servers_ips_[0]); |
- if (rv != SUCCESS) { |
- End(GENERIC_FAILURE, rv); |
- return rv; |
+ std::set<rtc::SocketAddress> addrs(all_servers_addrs_.begin(), |
+ all_servers_addrs_.end()); |
+ all_servers_addrs_.assign(addrs.begin(), addrs.end()); |
+ |
+ // Prepare all the sockets beforehand. All of them will bind to "any" address. |
+ while (sockets_.size() < total_socket_required()) { |
+ rtc::scoped_ptr<rtc::AsyncPacketSocket> socket( |
+ socket_factory_->CreateUdpSocket(rtc::SocketAddress(INADDR_ANY, 0), 0, |
+ 0)); |
+ if (!socket) { |
+ End(GENERIC_FAILURE); |
+ return; |
} |
- rv = socket->GetLocalAddress(&sock_addr); |
- if (rv != SUCCESS) { |
- End(GENERIC_FAILURE, rv); |
- return rv; |
+ // Chrome and WebRTC behave differently in terms of the state of a socket |
+ // once returned from PacketSocketFactory::CreateUdpSocket. |
+ if (socket->GetState() == rtc::AsyncPacketSocket::STATE_BINDING) { |
+ socket->SignalAddressReady.connect(this, &StunProber::OnSocketReady); |
+ } else { |
+ OnSocketReady(socket.get(), rtc::SocketAddress(INADDR_ANY, 0)); |
} |
- local_addr_ = sock_addr.ipaddr(); |
- socket->Close(); |
+ sockets_.push_back(socket.release()); |
} |
- *addr = local_addr_; |
- return 0; |
} |
StunProber::Requester* StunProber::CreateRequester() { |
DCHECK(thread_checker_.CalledOnValidThread()); |
- rtc::scoped_ptr<ServerSocketInterface> socket( |
- socket_factory_->CreateServerSocket(kMaxUdpBufferSize, |
- kMaxUdpBufferSize)); |
- if (!socket) { |
+ if (!sockets_.size()) { |
return nullptr; |
} |
+ StunProber::Requester* requester; |
if (shared_socket_mode_) { |
- return new Requester(this, socket.release(), all_servers_ips_); |
+ requester = new Requester(this, sockets_.back(), all_servers_addrs_); |
} else { |
std::vector<rtc::SocketAddress> server_ip; |
server_ip.push_back( |
- all_servers_ips_[(num_request_sent_ % all_servers_ips_.size())]); |
- return new Requester(this, socket.release(), server_ip); |
+ all_servers_addrs_[(num_request_sent_ % all_servers_addrs_.size())]); |
+ requester = new Requester(this, sockets_.back(), server_ip); |
} |
+ |
+ sockets_.pop_back(); |
+ return requester; |
} |
bool StunProber::SendNextRequest() { |
@@ -407,19 +380,20 @@ void StunProber::MaybeScheduleStunRequests() { |
uint32 now = rtc::Time(); |
if (Done()) { |
- task_runner_->PostTask(rtc::Bind(&StunProber::End, this, SUCCESS, 0), |
- timeout_ms_); |
+ invoker_.AsyncInvokeDelayed<void>( |
+ thread_, rtc::Bind(&StunProber::End, this, SUCCESS), timeout_ms_); |
return; |
} |
- if (now >= next_request_time_ms_) { |
+ if ((now + (thread_wake_up_interval_ms / 2)) >= next_request_time_ms_) { |
if (!SendNextRequest()) { |
- End(GENERIC_FAILURE, 0); |
+ End(GENERIC_FAILURE); |
return; |
} |
next_request_time_ms_ = now + interval_ms_; |
} |
- task_runner_->PostTask( |
- rtc::Bind(&StunProber::MaybeScheduleStunRequests, this), 1 /* ms */); |
+ invoker_.AsyncInvokeDelayed<void>( |
+ thread_, rtc::Bind(&StunProber::MaybeScheduleStunRequests, this), |
+ thread_wake_up_interval_ms /* ms */); |
} |
bool StunProber::GetStats(StunProber::Stats* prob_stats) const { |
@@ -464,14 +438,7 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const { |
IncrementCounterByAddress(&num_response_per_server, request->server_addr); |
IncrementCounterByAddress(&num_response_per_srflx_addr, |
request->srflx_addr); |
- |
rtt_sum += request->rtt(); |
- if (nat_type == NATTYPE_INVALID) { |
- nat_type = request->behind_nat ? NATTYPE_UNKNOWN : NATTYPE_NONE; |
- } else if (behind_nat(nat_type) != request->behind_nat) { |
- // Detect the inconsistency in NAT presence. |
- return false; |
- } |
stats.srflx_addrs.insert(request->srflx_addr.ToString()); |
srflx_ips.insert(request->srflx_addr.ipaddr()); |
} |
@@ -505,18 +472,34 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const { |
return false; |
} |
- stats.nat_type = nat_type; |
- |
// Shared mode is only true if we use the shared socket and there are more |
// than 1 responding servers. |
stats.shared_socket_mode = |
shared_socket_mode_ && (num_server_ip_with_response > 1); |
- if (stats.shared_socket_mode && nat_type == NATTYPE_UNKNOWN) { |
- stats.nat_type = NATTYPE_NON_SYMMETRIC; |
+ if (stats.shared_socket_mode && nat_type == NATTYPE_INVALID) { |
+ nat_type = NATTYPE_NON_SYMMETRIC; |
} |
- stats.host_ip = local_addr_.ToString(); |
+ // If we could find a local IP matching srflx, we're not behind a NAT. |
+ rtc::SocketAddress srflx_addr; |
+ if (!srflx_addr.FromString(*(stats.srflx_addrs.begin()))) { |
+ return false; |
+ } |
+ for (const auto& net : networks_) { |
+ if (srflx_addr.ipaddr() == net->GetBestIP()) { |
+ nat_type = stunprober::NATTYPE_NONE; |
+ stats.host_ip = net->GetBestIP().ToString(); |
+ break; |
+ } |
+ } |
+ |
+ // Finally, we know we're behind a NAT but can't determine which type it is. |
+ if (nat_type == NATTYPE_INVALID) { |
+ nat_type = NATTYPE_UNKNOWN; |
+ } |
+ |
+ stats.nat_type = nat_type; |
stats.num_request_sent = num_sent; |
stats.num_response_received = num_received; |
stats.target_request_interval_ns = interval_ms_ * 1000; |
@@ -538,14 +521,14 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const { |
return true; |
} |
-void StunProber::End(StunProber::Status status, int result) { |
+void StunProber::End(StunProber::Status status) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
if (!finished_callback_.empty()) { |
AsyncCallback callback = finished_callback_; |
finished_callback_ = AsyncCallback(); |
// Callback at the last since the prober might be deleted in the callback. |
- callback(status); |
+ callback(this, status); |
} |
} |