Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(372)

Side by Side Diff: packages/pool/lib/pool.dart

Issue 3015713002: Roll to pickup pool changes
Patch Set: Created 3 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « packages/pool/README.md ('k') | packages/pool/pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 import 'dart:collection'; 6 import 'dart:collection';
7 7
8 import 'package:async/async.dart'; 8 import 'package:async/async.dart';
9 import 'package:stack_trace/stack_trace.dart'; 9 import 'package:stack_trace/stack_trace.dart';
10 10
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
71 /// If any [PoolResource.allowRelease] callback throws an exception after the 71 /// If any [PoolResource.allowRelease] callback throws an exception after the
72 /// pool is closed, this completes with that exception. 72 /// pool is closed, this completes with that exception.
73 Future get done => _closeMemo.future; 73 Future get done => _closeMemo.future;
74 74
75 /// Creates a new pool with the given limit on how many resources may be 75 /// Creates a new pool with the given limit on how many resources may be
76 /// allocated at once. 76 /// allocated at once.
77 /// 77 ///
78 /// If [timeout] is passed, then if that much time passes without any activity 78 /// If [timeout] is passed, then if that much time passes without any activity
79 /// all pending [request] futures will throw a [TimeoutException]. This is 79 /// all pending [request] futures will throw a [TimeoutException]. This is
80 /// intended to avoid deadlocks. 80 /// intended to avoid deadlocks.
81 Pool(this._maxAllocatedResources, {Duration timeout}) 81 Pool(this._maxAllocatedResources, {Duration timeout}) : _timeout = timeout {
82 : _timeout = timeout {
83 if (timeout != null) { 82 if (timeout != null) {
84 // Start the timer canceled since we only want to start counting down once 83 // Start the timer canceled since we only want to start counting down once
85 // we've run out of available resources. 84 // we've run out of available resources.
86 _timer = new RestartableTimer(timeout, _onTimeout)..cancel(); 85 _timer = new RestartableTimer(timeout, _onTimeout)..cancel();
87 } 86 }
88 } 87 }
89 88
90 /// Request a [PoolResource]. 89 /// Request a [PoolResource].
91 /// 90 ///
92 /// If the maximum number of resources is already allocated, this will delay 91 /// If the maximum number of resources is already allocated, this will delay
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
133 /// Existing resource requests remain unchanged. 132 /// Existing resource requests remain unchanged.
134 /// 133 ///
135 /// Any resources that are marked as releasable using 134 /// Any resources that are marked as releasable using
136 /// [PoolResource.allowRelease] are released immediately. Once all resources 135 /// [PoolResource.allowRelease] are released immediately. Once all resources
137 /// have been released and any `onRelease` callbacks have completed, the 136 /// have been released and any `onRelease` callbacks have completed, the
138 /// returned future completes successfully. If any `onRelease` callback throws 137 /// returned future completes successfully. If any `onRelease` callback throws
139 /// an error, the returned future completes with that error. 138 /// an error, the returned future completes with that error.
140 /// 139 ///
141 /// This may be called more than once; it returns the same [Future] each time. 140 /// This may be called more than once; it returns the same [Future] each time.
142 Future close() => _closeMemo.runOnce(() { 141 Future close() => _closeMemo.runOnce(() {
143 if (_closeGroup != null) return _closeGroup.future; 142 if (_closeGroup != null) return _closeGroup.future;
144 143
145 _resetTimer(); 144 _resetTimer();
146 145
147 _closeGroup = new FutureGroup(); 146 _closeGroup = new FutureGroup();
148 for (var callback in _onReleaseCallbacks) { 147 for (var callback in _onReleaseCallbacks) {
149 _closeGroup.add(new Future.sync(callback)); 148 _closeGroup.add(new Future.sync(callback));
150 } 149 }
151 150
152 _allocatedResources -= _onReleaseCallbacks.length; 151 _allocatedResources -= _onReleaseCallbacks.length;
153 _onReleaseCallbacks.clear(); 152 _onReleaseCallbacks.clear();
154 153
155 if (_allocatedResources == 0) _closeGroup.close(); 154 if (_allocatedResources == 0) _closeGroup.close();
156 return _closeGroup.future; 155 return _closeGroup.future;
157 }); 156 });
158 final _closeMemo = new AsyncMemoizer(); 157 final _closeMemo = new AsyncMemoizer();
159 158
160 /// If there are any pending requests, this will fire the oldest one. 159 /// If there are any pending requests, this will fire the oldest one.
161 void _onResourceReleased() { 160 void _onResourceReleased() {
162 _resetTimer(); 161 _resetTimer();
163 162
164 if (_requestedResources.isNotEmpty) { 163 if (_requestedResources.isNotEmpty) {
165 var pending = _requestedResources.removeFirst(); 164 var pending = _requestedResources.removeFirst();
166 pending.complete(new PoolResource._(this)); 165 pending.complete(new PoolResource._(this));
167 } else { 166 } else {
168 _allocatedResources--; 167 _allocatedResources--;
169 if (isClosed && _allocatedResources == 0) _closeGroup.close(); 168 if (isClosed && _allocatedResources == 0) _closeGroup.close();
170 } 169 }
171 } 170 }
172 171
173 /// If there are any pending requests, this will fire the oldest one after 172 /// If there are any pending requests, this will fire the oldest one after
174 /// running [onRelease]. 173 /// running [onRelease].
175 void _onResourceReleaseAllowed(onRelease()) { 174 void _onResourceReleaseAllowed(onRelease()) {
176 _resetTimer(); 175 _resetTimer();
177 176
178 if (_requestedResources.isNotEmpty) { 177 if (_requestedResources.isNotEmpty) {
179 var pending = _requestedResources.removeFirst(); 178 var pending = _requestedResources.removeFirst();
180 pending.complete(_runOnRelease(onRelease)); 179 pending.complete(_runOnRelease(onRelease));
181 } else if (isClosed) { 180 } else if (isClosed) {
182 _closeGroup.add(new Future.sync(onRelease)); 181 _closeGroup.add(new Future.sync(onRelease));
183 _allocatedResources--; 182 _allocatedResources--;
184 if (_allocatedResources == 0) _closeGroup.close(); 183 if (_allocatedResources == 0) _closeGroup.close();
185 } else { 184 } else {
186 _onReleaseCallbacks.add( 185 var zone = Zone.current;
187 Zone.current.bindCallback(onRelease, runGuarded: false)); 186 var registered = zone.registerCallback(onRelease);
187 _onReleaseCallbacks.add(() => zone.run(registered));
188 } 188 }
189 } 189 }
190 190
191 /// Runs [onRelease] and returns a Future that completes to a resource once an 191 /// Runs [onRelease] and returns a Future that completes to a resource once an
192 /// [onRelease] callback completes. 192 /// [onRelease] callback completes.
193 /// 193 ///
194 /// Futures returned by [_runOnRelease] always complete in the order they were 194 /// Futures returned by [_runOnRelease] always complete in the order they were
195 /// created, even if earlier [onRelease] callbacks take longer to run. 195 /// created, even if earlier [onRelease] callbacks take longer to run.
196 Future<PoolResource> _runOnRelease(onRelease()) { 196 Future<PoolResource> _runOnRelease(onRelease()) {
197 new Future.sync(onRelease).then((value) { 197 new Future.sync(onRelease).then((value) {
(...skipping 16 matching lines...) Expand all
214 } else { 214 } else {
215 _timer.reset(); 215 _timer.reset();
216 } 216 }
217 } 217 }
218 218
219 /// Handles [_timer] timing out by causing all pending resource completers to 219 /// Handles [_timer] timing out by causing all pending resource completers to
220 /// emit exceptions. 220 /// emit exceptions.
221 void _onTimeout() { 221 void _onTimeout() {
222 for (var completer in _requestedResources) { 222 for (var completer in _requestedResources) {
223 completer.completeError( 223 completer.completeError(
224 new TimeoutException("Pool deadlock: all resources have been " 224 new TimeoutException(
225 "Pool deadlock: all resources have been "
225 "allocated for too long.", 226 "allocated for too long.",
226 _timeout), 227 _timeout),
227 new Chain.current()); 228 new Chain.current());
228 } 229 }
229 _requestedResources.clear(); 230 _requestedResources.clear();
230 _timer = null; 231 _timer = null;
231 } 232 }
232 } 233 }
233 234
234 /// A member of a [Pool]. 235 /// A member of a [Pool].
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
266 /// produce additional information later on. For example, an isolate's task 267 /// produce additional information later on. For example, an isolate's task
267 /// may be complete, but it could still emit asynchronous errors. 268 /// may be complete, but it could still emit asynchronous errors.
268 void allowRelease(onRelease()) { 269 void allowRelease(onRelease()) {
269 if (_released) { 270 if (_released) {
270 throw new StateError("A PoolResource may only be released once."); 271 throw new StateError("A PoolResource may only be released once.");
271 } 272 }
272 _released = true; 273 _released = true;
273 _pool._onResourceReleaseAllowed(onRelease); 274 _pool._onResourceReleaseAllowed(onRelease);
274 } 275 }
275 } 276 }
OLDNEW
« no previous file with comments | « packages/pool/README.md ('k') | packages/pool/pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698