Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(23)

Unified Diff: webrtc/p2p/stunprober/stunprober.cc

Issue 1173353002: Remove all glue interfaces and use existing webrtc interfaces (Closed) Base URL: https://chromium.googlesource.com/external/webrtc@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « webrtc/p2p/stunprober/stunprober.h ('k') | webrtc/p2p/stunprober/stunprober_dependencies.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: webrtc/p2p/stunprober/stunprober.cc
diff --git a/webrtc/p2p/stunprober/stunprober.cc b/webrtc/p2p/stunprober/stunprober.cc
index 8efe069b0b52f1841e59978a24023d087415e881..4a33444cc0545bcb7c78e90d9dc27a1ce66fcec0 100644
--- a/webrtc/p2p/stunprober/stunprober.cc
+++ b/webrtc/p2p/stunprober/stunprober.cc
@@ -13,10 +13,15 @@
#include <set>
#include <string>
+#include "webrtc/base/asyncpacketsocket.h"
+#include "webrtc/base/asyncresolverinterface.h"
#include "webrtc/base/bind.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/helpers.h"
+#include "webrtc/base/logging.h"
#include "webrtc/base/timeutils.h"
+#include "webrtc/base/thread.h"
+#include "webrtc/p2p/base/packetsocketfactory.h"
#include "webrtc/p2p/base/stun.h"
#include "webrtc/p2p/stunprober/stunprober.h"
@@ -24,20 +29,18 @@ namespace stunprober {
namespace {
+const int thread_wake_up_interval_ms = 5;
+
template <typename T>
void IncrementCounterByAddress(std::map<T, int>* counter_per_ip, const T& ip) {
counter_per_ip->insert(std::make_pair(ip, 0)).first->second++;
}
-bool behind_nat(NatType nat_type) {
- return nat_type > stunprober::NATTYPE_NONE;
-}
-
} // namespace
// A requester tracks the requests and responses from a single socket to many
// STUN servers
-class StunProber::Requester {
+class StunProber::Requester : public sigslot::has_slots<> {
public:
// Each Request maps to a request and response.
struct Request {
@@ -46,26 +49,20 @@ class StunProber::Requester {
// Time the response was received.
int64 received_time_ms = 0;
- // See whether the observed address returned matches the
- // local address as in StunProber.local_addr_.
- bool behind_nat = false;
-
// Server reflexive address from STUN response for this given request.
rtc::SocketAddress srflx_addr;
rtc::IPAddress server_addr;
int64 rtt() { return received_time_ms - sent_time_ms; }
- void ProcessResponse(rtc::ByteBuffer* message,
- int buf_len,
- const rtc::IPAddress& local_addr);
+ void ProcessResponse(const char* buf, size_t buf_len);
};
// StunProber provides |server_ips| for Requester to probe. For shared
// socket mode, it'll be all the resolved IP addresses. For non-shared mode,
// it'll just be a single address.
Requester(StunProber* prober,
- ServerSocketInterface* socket,
+ rtc::AsyncPacketSocket* socket,
const std::vector<rtc::SocketAddress>& server_ips);
virtual ~Requester();
@@ -74,11 +71,11 @@ class StunProber::Requester {
// and move to the next one.
void SendStunRequest();
- void ReadStunResponse();
-
- // |result| is the positive return value from RecvFrom when data is
- // available.
- void OnStunResponseReceived(int result);
+ void OnStunResponseReceived(rtc::AsyncPacketSocket* socket,
+ const char* buf,
+ size_t size,
+ const rtc::SocketAddress& addr,
+ const rtc::PacketTime& time);
const std::vector<Request*>& requests() { return requests_; }
@@ -93,7 +90,7 @@ class StunProber::Requester {
StunProber* prober_;
// The socket for this session.
- rtc::scoped_ptr<ServerSocketInterface> socket_;
+ rtc::scoped_ptr<rtc::AsyncPacketSocket> socket_;
// Temporary SocketAddress and buffer for RecvFrom.
rtc::SocketAddress addr_;
@@ -111,13 +108,15 @@ class StunProber::Requester {
StunProber::Requester::Requester(
StunProber* prober,
- ServerSocketInterface* socket,
+ rtc::AsyncPacketSocket* socket,
const std::vector<rtc::SocketAddress>& server_ips)
: prober_(prober),
socket_(socket),
response_packet_(new rtc::ByteBuffer(nullptr, kMaxUdpBufferSize)),
server_ips_(server_ips),
thread_checker_(prober->thread_checker_) {
+ socket_->SignalReadPacket.connect(
+ this, &StunProber::Requester::OnStunResponseReceived);
}
StunProber::Requester::~Requester() {
@@ -145,7 +144,7 @@ void StunProber::Requester::SendStunRequest() {
rtc::scoped_ptr<rtc::ByteBuffer> request_packet(
new rtc::ByteBuffer(nullptr, kMaxUdpBufferSize));
if (!message.Write(request_packet.get())) {
- prober_->End(WRITE_FAILED, 0);
+ prober_->End(WRITE_FAILED);
return;
}
@@ -155,48 +154,26 @@ void StunProber::Requester::SendStunRequest() {
// The write must succeed immediately. Otherwise, the calculating of the STUN
// request timing could become too complicated. Callback is ignored by passing
// empty AsyncCallback.
- int rv = socket_->SendTo(addr, const_cast<char*>(request_packet->Data()),
- request_packet->Length(), AsyncCallback());
+ rtc::PacketOptions options;
+ int rv = socket_->SendTo(const_cast<char*>(request_packet->Data()),
+ request_packet->Length(), addr, options);
if (rv < 0) {
- prober_->End(WRITE_FAILED, rv);
+ prober_->End(WRITE_FAILED);
return;
}
request.sent_time_ms = rtc::Time();
- // Post a read waiting for response. For share mode, the subsequent read will
- // be posted inside OnStunResponseReceived.
- if (num_request_sent_ == 0) {
- ReadStunResponse();
- }
-
num_request_sent_++;
DCHECK(static_cast<size_t>(num_request_sent_) <= server_ips_.size());
}
-void StunProber::Requester::ReadStunResponse() {
- DCHECK(thread_checker_.CalledOnValidThread());
- if (!socket_) {
- return;
- }
-
- int rv = socket_->RecvFrom(
- response_packet_->ReserveWriteBuffer(kMaxUdpBufferSize),
- kMaxUdpBufferSize, &addr_,
- [this](int result) { this->OnStunResponseReceived(result); });
- if (rv != SocketInterface::IO_PENDING) {
- OnStunResponseReceived(rv);
- }
-}
-
-void StunProber::Requester::Request::ProcessResponse(
- rtc::ByteBuffer* message,
- int buf_len,
- const rtc::IPAddress& local_addr) {
+void StunProber::Requester::Request::ProcessResponse(const char* buf,
+ size_t buf_len) {
int64 now = rtc::Time();
-
+ rtc::ByteBuffer message(buf, buf_len);
cricket::StunMessage stun_response;
- if (!stun_response.Read(message)) {
+ if (!stun_response.Read(&message)) {
// Invalid or incomplete STUN packet.
received_time_ms = 0;
return;
@@ -218,40 +195,25 @@ void StunProber::Requester::Request::ProcessResponse(
received_time_ms = now;
srflx_addr = addr_attr->GetAddress();
-
- // Calculate behind_nat.
- behind_nat = (srflx_addr.ipaddr() != local_addr);
}
-void StunProber::Requester::OnStunResponseReceived(int result) {
+void StunProber::Requester::OnStunResponseReceived(
+ rtc::AsyncPacketSocket* socket,
+ const char* buf,
+ size_t size,
+ const rtc::SocketAddress& addr,
+ const rtc::PacketTime& time) {
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(socket_);
-
- if (result < 0) {
- // Something is wrong, finish the test.
- prober_->End(READ_FAILED, result);
- return;
- }
-
- Request* request = GetRequestByAddress(addr_.ipaddr());
+ Request* request = GetRequestByAddress(addr.ipaddr());
if (!request) {
// Something is wrong, finish the test.
- prober_->End(GENERIC_FAILURE, result);
+ prober_->End(GENERIC_FAILURE);
return;
}
num_response_received_++;
-
- // Resize will set the end_ to indicate that there are data available in this
- // ByteBuffer.
- response_packet_->Resize(result);
- request->ProcessResponse(response_packet_.get(), result,
- prober_->local_addr_);
-
- if (static_cast<size_t>(num_response_received_) < server_ips_.size()) {
- // Post another read response.
- ReadStunResponse();
- }
+ request->ProcessResponse(buf, size);
}
StunProber::Requester::Request* StunProber::Requester::GetRequestByAddress(
@@ -266,13 +228,13 @@ StunProber::Requester::Request* StunProber::Requester::GetRequestByAddress(
return nullptr;
}
-StunProber::StunProber(HostNameResolverInterface* host_name_resolver,
- SocketFactoryInterface* socket_factory,
- TaskRunnerInterface* task_runner)
+StunProber::StunProber(rtc::PacketSocketFactory* socket_factory,
+ rtc::Thread* thread,
+ const rtc::NetworkManager::NetworkList& networks)
: interval_ms_(0),
socket_factory_(socket_factory),
- resolver_(host_name_resolver),
- task_runner_(task_runner) {
+ thread_(thread),
+ networks_(networks) {
}
StunProber::~StunProber() {
@@ -281,6 +243,11 @@ StunProber::~StunProber() {
delete req;
}
}
+ for (auto s : sockets_) {
+ if (s) {
+ delete s;
+ }
+ }
}
bool StunProber::Start(const std::vector<rtc::SocketAddress>& servers,
@@ -301,92 +268,98 @@ bool StunProber::Start(const std::vector<rtc::SocketAddress>& servers,
timeout_ms_ = timeout_ms;
servers_ = servers;
finished_callback_ = callback;
- resolver_->Resolve(servers_[0], &resolved_ips_,
- [this](int result) { this->OnServerResolved(0, result); });
+ return ResolveServerName(servers_.back());
+}
+
+bool StunProber::ResolveServerName(const rtc::SocketAddress& addr) {
+ rtc::AsyncResolverInterface* resolver =
+ socket_factory_->CreateAsyncResolver();
+ if (!resolver) {
+ return false;
+ }
+ resolver->SignalDone.connect(this, &StunProber::OnServerResolved);
+ resolver->Start(addr);
return true;
}
-void StunProber::OnServerResolved(int index, int result) {
+void StunProber::OnSocketReady(rtc::AsyncPacketSocket* socket,
+ const rtc::SocketAddress& addr) {
+ total_ready_sockets_++;
+ if (total_ready_sockets_ == total_socket_required()) {
+ MaybeScheduleStunRequests();
+ }
+}
+
+void StunProber::OnServerResolved(rtc::AsyncResolverInterface* resolver) {
DCHECK(thread_checker_.CalledOnValidThread());
- if (result == 0) {
- all_servers_ips_.insert(all_servers_ips_.end(), resolved_ips_.begin(),
- resolved_ips_.end());
- resolved_ips_.clear();
+ if (resolver->GetError() == 0) {
+ rtc::SocketAddress addr(resolver->address().ipaddr(),
+ resolver->address().port());
+ all_servers_addrs_.push_back(addr);
}
- index++;
+ // Deletion of AsyncResolverInterface can't be done in OnResolveResult which
+ // handles SignalDone.
+ invoker_.AsyncInvoke<void>(
+ thread_,
+ rtc::Bind(&rtc::AsyncResolverInterface::Destroy, resolver, false));
+ servers_.pop_back();
- if (static_cast<size_t>(index) < servers_.size()) {
- resolver_->Resolve(
- servers_[index], &resolved_ips_,
- [this, index](int result) { this->OnServerResolved(index, result); });
+ if (servers_.size()) {
+ if (!ResolveServerName(servers_.back())) {
+ End(RESOLVE_FAILED);
+ }
return;
}
- if (all_servers_ips_.size() == 0) {
- End(RESOLVE_FAILED, result);
+ if (all_servers_addrs_.size() == 0) {
+ End(RESOLVE_FAILED);
return;
}
// Dedupe.
- std::set<rtc::SocketAddress> addrs(all_servers_ips_.begin(),
- all_servers_ips_.end());
- all_servers_ips_.assign(addrs.begin(), addrs.end());
-
- rtc::IPAddress addr;
- if (GetLocalAddress(&addr) != 0) {
- End(GENERIC_FAILURE, result);
- return;
- }
-
- socket_factory_->Prepare(GetTotalClientSockets(), GetTotalServerSockets(),
- [this](int result) {
- if (result == 0) {
- this->MaybeScheduleStunRequests();
- }
- });
-}
-
-int StunProber::GetLocalAddress(rtc::IPAddress* addr) {
- DCHECK(thread_checker_.CalledOnValidThread());
- if (local_addr_.family() == AF_UNSPEC) {
- rtc::SocketAddress sock_addr;
- rtc::scoped_ptr<ClientSocketInterface> socket(
- socket_factory_->CreateClientSocket());
- int rv = socket->Connect(all_servers_ips_[0]);
- if (rv != SUCCESS) {
- End(GENERIC_FAILURE, rv);
- return rv;
+ std::set<rtc::SocketAddress> addrs(all_servers_addrs_.begin(),
+ all_servers_addrs_.end());
+ all_servers_addrs_.assign(addrs.begin(), addrs.end());
+
+ // Prepare all the sockets beforehand. All of them will bind to "any" address.
+ while (sockets_.size() < total_socket_required()) {
+ rtc::scoped_ptr<rtc::AsyncPacketSocket> socket(
+ socket_factory_->CreateUdpSocket(rtc::SocketAddress(INADDR_ANY, 0), 0,
+ 0));
+ if (!socket) {
+ End(GENERIC_FAILURE);
+ return;
}
- rv = socket->GetLocalAddress(&sock_addr);
- if (rv != SUCCESS) {
- End(GENERIC_FAILURE, rv);
- return rv;
+ // Chrome and WebRTC behave differently in terms of the state of a socket
+ // once returned from PacketSocketFactory::CreateUdpSocket.
+ if (socket->GetState() == rtc::AsyncPacketSocket::STATE_BINDING) {
+ socket->SignalAddressReady.connect(this, &StunProber::OnSocketReady);
+ } else {
+ OnSocketReady(socket.get(), rtc::SocketAddress(INADDR_ANY, 0));
}
- local_addr_ = sock_addr.ipaddr();
- socket->Close();
+ sockets_.push_back(socket.release());
}
- *addr = local_addr_;
- return 0;
}
StunProber::Requester* StunProber::CreateRequester() {
DCHECK(thread_checker_.CalledOnValidThread());
- rtc::scoped_ptr<ServerSocketInterface> socket(
- socket_factory_->CreateServerSocket(kMaxUdpBufferSize,
- kMaxUdpBufferSize));
- if (!socket) {
+ if (!sockets_.size()) {
return nullptr;
}
+ StunProber::Requester* requester;
if (shared_socket_mode_) {
- return new Requester(this, socket.release(), all_servers_ips_);
+ requester = new Requester(this, sockets_.back(), all_servers_addrs_);
} else {
std::vector<rtc::SocketAddress> server_ip;
server_ip.push_back(
- all_servers_ips_[(num_request_sent_ % all_servers_ips_.size())]);
- return new Requester(this, socket.release(), server_ip);
+ all_servers_addrs_[(num_request_sent_ % all_servers_addrs_.size())]);
+ requester = new Requester(this, sockets_.back(), server_ip);
}
+
+ sockets_.pop_back();
+ return requester;
}
bool StunProber::SendNextRequest() {
@@ -407,19 +380,20 @@ void StunProber::MaybeScheduleStunRequests() {
uint32 now = rtc::Time();
if (Done()) {
- task_runner_->PostTask(rtc::Bind(&StunProber::End, this, SUCCESS, 0),
- timeout_ms_);
+ invoker_.AsyncInvokeDelayed<void>(
+ thread_, rtc::Bind(&StunProber::End, this, SUCCESS), timeout_ms_);
return;
}
- if (now >= next_request_time_ms_) {
+ if ((now + (thread_wake_up_interval_ms / 2)) >= next_request_time_ms_) {
if (!SendNextRequest()) {
- End(GENERIC_FAILURE, 0);
+ End(GENERIC_FAILURE);
return;
}
next_request_time_ms_ = now + interval_ms_;
}
- task_runner_->PostTask(
- rtc::Bind(&StunProber::MaybeScheduleStunRequests, this), 1 /* ms */);
+ invoker_.AsyncInvokeDelayed<void>(
+ thread_, rtc::Bind(&StunProber::MaybeScheduleStunRequests, this),
+ thread_wake_up_interval_ms /* ms */);
}
bool StunProber::GetStats(StunProber::Stats* prob_stats) const {
@@ -464,14 +438,7 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const {
IncrementCounterByAddress(&num_response_per_server, request->server_addr);
IncrementCounterByAddress(&num_response_per_srflx_addr,
request->srflx_addr);
-
rtt_sum += request->rtt();
- if (nat_type == NATTYPE_INVALID) {
- nat_type = request->behind_nat ? NATTYPE_UNKNOWN : NATTYPE_NONE;
- } else if (behind_nat(nat_type) != request->behind_nat) {
- // Detect the inconsistency in NAT presence.
- return false;
- }
stats.srflx_addrs.insert(request->srflx_addr.ToString());
srflx_ips.insert(request->srflx_addr.ipaddr());
}
@@ -505,18 +472,34 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const {
return false;
}
- stats.nat_type = nat_type;
-
// Shared mode is only true if we use the shared socket and there are more
// than 1 responding servers.
stats.shared_socket_mode =
shared_socket_mode_ && (num_server_ip_with_response > 1);
- if (stats.shared_socket_mode && nat_type == NATTYPE_UNKNOWN) {
- stats.nat_type = NATTYPE_NON_SYMMETRIC;
+ if (stats.shared_socket_mode && nat_type == NATTYPE_INVALID) {
+ nat_type = NATTYPE_NON_SYMMETRIC;
}
- stats.host_ip = local_addr_.ToString();
+ // If we could find a local IP matching srflx, we're not behind a NAT.
+ rtc::SocketAddress srflx_addr;
+ if (!srflx_addr.FromString(*(stats.srflx_addrs.begin()))) {
+ return false;
+ }
+ for (const auto& net : networks_) {
+ if (srflx_addr.ipaddr() == net->GetBestIP()) {
+ nat_type = stunprober::NATTYPE_NONE;
+ stats.host_ip = net->GetBestIP().ToString();
+ break;
+ }
+ }
+
+ // Finally, we know we're behind a NAT but can't determine which type it is.
+ if (nat_type == NATTYPE_INVALID) {
+ nat_type = NATTYPE_UNKNOWN;
+ }
+
+ stats.nat_type = nat_type;
stats.num_request_sent = num_sent;
stats.num_response_received = num_received;
stats.target_request_interval_ns = interval_ms_ * 1000;
@@ -538,14 +521,14 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const {
return true;
}
-void StunProber::End(StunProber::Status status, int result) {
+void StunProber::End(StunProber::Status status) {
DCHECK(thread_checker_.CalledOnValidThread());
if (!finished_callback_.empty()) {
AsyncCallback callback = finished_callback_;
finished_callback_ = AsyncCallback();
// Callback at the last since the prober might be deleted in the callback.
- callback(status);
+ callback(this, status);
}
}
« no previous file with comments | « webrtc/p2p/stunprober/stunprober.h ('k') | webrtc/p2p/stunprober/stunprober_dependencies.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698