OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 import 'dart:collection'; | 6 import 'dart:collection'; |
7 | 7 |
8 import 'package:async/async.dart'; | 8 import 'package:async/async.dart'; |
9 import 'package:stack_trace/stack_trace.dart'; | 9 import 'package:stack_trace/stack_trace.dart'; |
10 | 10 |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
71 /// If any [PoolResource.allowRelease] callback throws an exception after the | 71 /// If any [PoolResource.allowRelease] callback throws an exception after the |
72 /// pool is closed, this completes with that exception. | 72 /// pool is closed, this completes with that exception. |
73 Future get done => _closeMemo.future; | 73 Future get done => _closeMemo.future; |
74 | 74 |
75 /// Creates a new pool with the given limit on how many resources may be | 75 /// Creates a new pool with the given limit on how many resources may be |
76 /// allocated at once. | 76 /// allocated at once. |
77 /// | 77 /// |
78 /// If [timeout] is passed, then if that much time passes without any activity | 78 /// If [timeout] is passed, then if that much time passes without any activity |
79 /// all pending [request] futures will throw a [TimeoutException]. This is | 79 /// all pending [request] futures will throw a [TimeoutException]. This is |
80 /// intended to avoid deadlocks. | 80 /// intended to avoid deadlocks. |
81 Pool(this._maxAllocatedResources, {Duration timeout}) | 81 Pool(this._maxAllocatedResources, {Duration timeout}) : _timeout = timeout { |
82 : _timeout = timeout { | |
83 if (timeout != null) { | 82 if (timeout != null) { |
84 // Start the timer canceled since we only want to start counting down once | 83 // Start the timer canceled since we only want to start counting down once |
85 // we've run out of available resources. | 84 // we've run out of available resources. |
86 _timer = new RestartableTimer(timeout, _onTimeout)..cancel(); | 85 _timer = new RestartableTimer(timeout, _onTimeout)..cancel(); |
87 } | 86 } |
88 } | 87 } |
89 | 88 |
90 /// Request a [PoolResource]. | 89 /// Request a [PoolResource]. |
91 /// | 90 /// |
92 /// If the maximum number of resources is already allocated, this will delay | 91 /// If the maximum number of resources is already allocated, this will delay |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
133 /// Existing resource requests remain unchanged. | 132 /// Existing resource requests remain unchanged. |
134 /// | 133 /// |
135 /// Any resources that are marked as releasable using | 134 /// Any resources that are marked as releasable using |
136 /// [PoolResource.allowRelease] are released immediately. Once all resources | 135 /// [PoolResource.allowRelease] are released immediately. Once all resources |
137 /// have been released and any `onRelease` callbacks have completed, the | 136 /// have been released and any `onRelease` callbacks have completed, the |
138 /// returned future completes successfully. If any `onRelease` callback throws | 137 /// returned future completes successfully. If any `onRelease` callback throws |
139 /// an error, the returned future completes with that error. | 138 /// an error, the returned future completes with that error. |
140 /// | 139 /// |
141 /// This may be called more than once; it returns the same [Future] each time. | 140 /// This may be called more than once; it returns the same [Future] each time. |
142 Future close() => _closeMemo.runOnce(() { | 141 Future close() => _closeMemo.runOnce(() { |
143 if (_closeGroup != null) return _closeGroup.future; | 142 if (_closeGroup != null) return _closeGroup.future; |
144 | 143 |
145 _resetTimer(); | 144 _resetTimer(); |
146 | 145 |
147 _closeGroup = new FutureGroup(); | 146 _closeGroup = new FutureGroup(); |
148 for (var callback in _onReleaseCallbacks) { | 147 for (var callback in _onReleaseCallbacks) { |
149 _closeGroup.add(new Future.sync(callback)); | 148 _closeGroup.add(new Future.sync(callback)); |
150 } | 149 } |
151 | 150 |
152 _allocatedResources -= _onReleaseCallbacks.length; | 151 _allocatedResources -= _onReleaseCallbacks.length; |
153 _onReleaseCallbacks.clear(); | 152 _onReleaseCallbacks.clear(); |
154 | 153 |
155 if (_allocatedResources == 0) _closeGroup.close(); | 154 if (_allocatedResources == 0) _closeGroup.close(); |
156 return _closeGroup.future; | 155 return _closeGroup.future; |
157 }); | 156 }); |
158 final _closeMemo = new AsyncMemoizer(); | 157 final _closeMemo = new AsyncMemoizer(); |
159 | 158 |
160 /// If there are any pending requests, this will fire the oldest one. | 159 /// If there are any pending requests, this will fire the oldest one. |
161 void _onResourceReleased() { | 160 void _onResourceReleased() { |
162 _resetTimer(); | 161 _resetTimer(); |
163 | 162 |
164 if (_requestedResources.isNotEmpty) { | 163 if (_requestedResources.isNotEmpty) { |
165 var pending = _requestedResources.removeFirst(); | 164 var pending = _requestedResources.removeFirst(); |
166 pending.complete(new PoolResource._(this)); | 165 pending.complete(new PoolResource._(this)); |
167 } else { | 166 } else { |
168 _allocatedResources--; | 167 _allocatedResources--; |
169 if (isClosed && _allocatedResources == 0) _closeGroup.close(); | 168 if (isClosed && _allocatedResources == 0) _closeGroup.close(); |
170 } | 169 } |
171 } | 170 } |
172 | 171 |
173 /// If there are any pending requests, this will fire the oldest one after | 172 /// If there are any pending requests, this will fire the oldest one after |
174 /// running [onRelease]. | 173 /// running [onRelease]. |
175 void _onResourceReleaseAllowed(onRelease()) { | 174 void _onResourceReleaseAllowed(onRelease()) { |
176 _resetTimer(); | 175 _resetTimer(); |
177 | 176 |
178 if (_requestedResources.isNotEmpty) { | 177 if (_requestedResources.isNotEmpty) { |
179 var pending = _requestedResources.removeFirst(); | 178 var pending = _requestedResources.removeFirst(); |
180 pending.complete(_runOnRelease(onRelease)); | 179 pending.complete(_runOnRelease(onRelease)); |
181 } else if (isClosed) { | 180 } else if (isClosed) { |
182 _closeGroup.add(new Future.sync(onRelease)); | 181 _closeGroup.add(new Future.sync(onRelease)); |
183 _allocatedResources--; | 182 _allocatedResources--; |
184 if (_allocatedResources == 0) _closeGroup.close(); | 183 if (_allocatedResources == 0) _closeGroup.close(); |
185 } else { | 184 } else { |
186 _onReleaseCallbacks.add( | 185 var zone = Zone.current; |
187 Zone.current.bindCallback(onRelease, runGuarded: false)); | 186 var registered = zone.registerCallback(onRelease); |
| 187 _onReleaseCallbacks.add(() => zone.run(registered)); |
188 } | 188 } |
189 } | 189 } |
190 | 190 |
191 /// Runs [onRelease] and returns a Future that completes to a resource once an | 191 /// Runs [onRelease] and returns a Future that completes to a resource once an |
192 /// [onRelease] callback completes. | 192 /// [onRelease] callback completes. |
193 /// | 193 /// |
194 /// Futures returned by [_runOnRelease] always complete in the order they were | 194 /// Futures returned by [_runOnRelease] always complete in the order they were |
195 /// created, even if earlier [onRelease] callbacks take longer to run. | 195 /// created, even if earlier [onRelease] callbacks take longer to run. |
196 Future<PoolResource> _runOnRelease(onRelease()) { | 196 Future<PoolResource> _runOnRelease(onRelease()) { |
197 new Future.sync(onRelease).then((value) { | 197 new Future.sync(onRelease).then((value) { |
(...skipping 16 matching lines...) Expand all Loading... |
214 } else { | 214 } else { |
215 _timer.reset(); | 215 _timer.reset(); |
216 } | 216 } |
217 } | 217 } |
218 | 218 |
219 /// Handles [_timer] timing out by causing all pending resource completers to | 219 /// Handles [_timer] timing out by causing all pending resource completers to |
220 /// emit exceptions. | 220 /// emit exceptions. |
221 void _onTimeout() { | 221 void _onTimeout() { |
222 for (var completer in _requestedResources) { | 222 for (var completer in _requestedResources) { |
223 completer.completeError( | 223 completer.completeError( |
224 new TimeoutException("Pool deadlock: all resources have been " | 224 new TimeoutException( |
| 225 "Pool deadlock: all resources have been " |
225 "allocated for too long.", | 226 "allocated for too long.", |
226 _timeout), | 227 _timeout), |
227 new Chain.current()); | 228 new Chain.current()); |
228 } | 229 } |
229 _requestedResources.clear(); | 230 _requestedResources.clear(); |
230 _timer = null; | 231 _timer = null; |
231 } | 232 } |
232 } | 233 } |
233 | 234 |
234 /// A member of a [Pool]. | 235 /// A member of a [Pool]. |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
266 /// produce additional information later on. For example, an isolate's task | 267 /// produce additional information later on. For example, an isolate's task |
267 /// may be complete, but it could still emit asynchronous errors. | 268 /// may be complete, but it could still emit asynchronous errors. |
268 void allowRelease(onRelease()) { | 269 void allowRelease(onRelease()) { |
269 if (_released) { | 270 if (_released) { |
270 throw new StateError("A PoolResource may only be released once."); | 271 throw new StateError("A PoolResource may only be released once."); |
271 } | 272 } |
272 _released = true; | 273 _released = true; |
273 _pool._onResourceReleaseAllowed(onRelease); | 274 _pool._onResourceReleaseAllowed(onRelease); |
274 } | 275 } |
275 } | 276 } |
OLD | NEW |