| OLD | NEW |
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 import 'dart:collection'; | 6 import 'dart:collection'; |
| 7 | 7 |
| 8 import 'package:async/async.dart'; | 8 import 'package:async/async.dart'; |
| 9 import 'package:stack_trace/stack_trace.dart'; | 9 import 'package:stack_trace/stack_trace.dart'; |
| 10 | 10 |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 71 /// If any [PoolResource.allowRelease] callback throws an exception after the | 71 /// If any [PoolResource.allowRelease] callback throws an exception after the |
| 72 /// pool is closed, this completes with that exception. | 72 /// pool is closed, this completes with that exception. |
| 73 Future get done => _closeMemo.future; | 73 Future get done => _closeMemo.future; |
| 74 | 74 |
| 75 /// Creates a new pool with the given limit on how many resources may be | 75 /// Creates a new pool with the given limit on how many resources may be |
| 76 /// allocated at once. | 76 /// allocated at once. |
| 77 /// | 77 /// |
| 78 /// If [timeout] is passed, then if that much time passes without any activity | 78 /// If [timeout] is passed, then if that much time passes without any activity |
| 79 /// all pending [request] futures will throw a [TimeoutException]. This is | 79 /// all pending [request] futures will throw a [TimeoutException]. This is |
| 80 /// intended to avoid deadlocks. | 80 /// intended to avoid deadlocks. |
| 81 Pool(this._maxAllocatedResources, {Duration timeout}) | 81 Pool(this._maxAllocatedResources, {Duration timeout}) : _timeout = timeout { |
| 82 : _timeout = timeout { | |
| 83 if (timeout != null) { | 82 if (timeout != null) { |
| 84 // Start the timer canceled since we only want to start counting down once | 83 // Start the timer canceled since we only want to start counting down once |
| 85 // we've run out of available resources. | 84 // we've run out of available resources. |
| 86 _timer = new RestartableTimer(timeout, _onTimeout)..cancel(); | 85 _timer = new RestartableTimer(timeout, _onTimeout)..cancel(); |
| 87 } | 86 } |
| 88 } | 87 } |
| 89 | 88 |
| 90 /// Request a [PoolResource]. | 89 /// Request a [PoolResource]. |
| 91 /// | 90 /// |
| 92 /// If the maximum number of resources is already allocated, this will delay | 91 /// If the maximum number of resources is already allocated, this will delay |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 133 /// Existing resource requests remain unchanged. | 132 /// Existing resource requests remain unchanged. |
| 134 /// | 133 /// |
| 135 /// Any resources that are marked as releasable using | 134 /// Any resources that are marked as releasable using |
| 136 /// [PoolResource.allowRelease] are released immediately. Once all resources | 135 /// [PoolResource.allowRelease] are released immediately. Once all resources |
| 137 /// have been released and any `onRelease` callbacks have completed, the | 136 /// have been released and any `onRelease` callbacks have completed, the |
| 138 /// returned future completes successfully. If any `onRelease` callback throws | 137 /// returned future completes successfully. If any `onRelease` callback throws |
| 139 /// an error, the returned future completes with that error. | 138 /// an error, the returned future completes with that error. |
| 140 /// | 139 /// |
| 141 /// This may be called more than once; it returns the same [Future] each time. | 140 /// This may be called more than once; it returns the same [Future] each time. |
| 142 Future close() => _closeMemo.runOnce(() { | 141 Future close() => _closeMemo.runOnce(() { |
| 143 if (_closeGroup != null) return _closeGroup.future; | 142 if (_closeGroup != null) return _closeGroup.future; |
| 144 | 143 |
| 145 _resetTimer(); | 144 _resetTimer(); |
| 146 | 145 |
| 147 _closeGroup = new FutureGroup(); | 146 _closeGroup = new FutureGroup(); |
| 148 for (var callback in _onReleaseCallbacks) { | 147 for (var callback in _onReleaseCallbacks) { |
| 149 _closeGroup.add(new Future.sync(callback)); | 148 _closeGroup.add(new Future.sync(callback)); |
| 150 } | 149 } |
| 151 | 150 |
| 152 _allocatedResources -= _onReleaseCallbacks.length; | 151 _allocatedResources -= _onReleaseCallbacks.length; |
| 153 _onReleaseCallbacks.clear(); | 152 _onReleaseCallbacks.clear(); |
| 154 | 153 |
| 155 if (_allocatedResources == 0) _closeGroup.close(); | 154 if (_allocatedResources == 0) _closeGroup.close(); |
| 156 return _closeGroup.future; | 155 return _closeGroup.future; |
| 157 }); | 156 }); |
| 158 final _closeMemo = new AsyncMemoizer(); | 157 final _closeMemo = new AsyncMemoizer(); |
| 159 | 158 |
| 160 /// If there are any pending requests, this will fire the oldest one. | 159 /// If there are any pending requests, this will fire the oldest one. |
| 161 void _onResourceReleased() { | 160 void _onResourceReleased() { |
| 162 _resetTimer(); | 161 _resetTimer(); |
| 163 | 162 |
| 164 if (_requestedResources.isNotEmpty) { | 163 if (_requestedResources.isNotEmpty) { |
| 165 var pending = _requestedResources.removeFirst(); | 164 var pending = _requestedResources.removeFirst(); |
| 166 pending.complete(new PoolResource._(this)); | 165 pending.complete(new PoolResource._(this)); |
| 167 } else { | 166 } else { |
| 168 _allocatedResources--; | 167 _allocatedResources--; |
| 169 if (isClosed && _allocatedResources == 0) _closeGroup.close(); | 168 if (isClosed && _allocatedResources == 0) _closeGroup.close(); |
| 170 } | 169 } |
| 171 } | 170 } |
| 172 | 171 |
| 173 /// If there are any pending requests, this will fire the oldest one after | 172 /// If there are any pending requests, this will fire the oldest one after |
| 174 /// running [onRelease]. | 173 /// running [onRelease]. |
| 175 void _onResourceReleaseAllowed(onRelease()) { | 174 void _onResourceReleaseAllowed(onRelease()) { |
| 176 _resetTimer(); | 175 _resetTimer(); |
| 177 | 176 |
| 178 if (_requestedResources.isNotEmpty) { | 177 if (_requestedResources.isNotEmpty) { |
| 179 var pending = _requestedResources.removeFirst(); | 178 var pending = _requestedResources.removeFirst(); |
| 180 pending.complete(_runOnRelease(onRelease)); | 179 pending.complete(_runOnRelease(onRelease)); |
| 181 } else if (isClosed) { | 180 } else if (isClosed) { |
| 182 _closeGroup.add(new Future.sync(onRelease)); | 181 _closeGroup.add(new Future.sync(onRelease)); |
| 183 _allocatedResources--; | 182 _allocatedResources--; |
| 184 if (_allocatedResources == 0) _closeGroup.close(); | 183 if (_allocatedResources == 0) _closeGroup.close(); |
| 185 } else { | 184 } else { |
| 186 _onReleaseCallbacks.add( | 185 var zone = Zone.current; |
| 187 Zone.current.bindCallback(onRelease, runGuarded: false)); | 186 var registered = zone.registerCallback(onRelease); |
| 187 _onReleaseCallbacks.add(() => zone.run(registered)); |
| 188 } | 188 } |
| 189 } | 189 } |
| 190 | 190 |
| 191 /// Runs [onRelease] and returns a Future that completes to a resource once an | 191 /// Runs [onRelease] and returns a Future that completes to a resource once an |
| 192 /// [onRelease] callback completes. | 192 /// [onRelease] callback completes. |
| 193 /// | 193 /// |
| 194 /// Futures returned by [_runOnRelease] always complete in the order they were | 194 /// Futures returned by [_runOnRelease] always complete in the order they were |
| 195 /// created, even if earlier [onRelease] callbacks take longer to run. | 195 /// created, even if earlier [onRelease] callbacks take longer to run. |
| 196 Future<PoolResource> _runOnRelease(onRelease()) { | 196 Future<PoolResource> _runOnRelease(onRelease()) { |
| 197 new Future.sync(onRelease).then((value) { | 197 new Future.sync(onRelease).then((value) { |
| (...skipping 16 matching lines...) Expand all Loading... |
| 214 } else { | 214 } else { |
| 215 _timer.reset(); | 215 _timer.reset(); |
| 216 } | 216 } |
| 217 } | 217 } |
| 218 | 218 |
| 219 /// Handles [_timer] timing out by causing all pending resource completers to | 219 /// Handles [_timer] timing out by causing all pending resource completers to |
| 220 /// emit exceptions. | 220 /// emit exceptions. |
| 221 void _onTimeout() { | 221 void _onTimeout() { |
| 222 for (var completer in _requestedResources) { | 222 for (var completer in _requestedResources) { |
| 223 completer.completeError( | 223 completer.completeError( |
| 224 new TimeoutException("Pool deadlock: all resources have been " | 224 new TimeoutException( |
| 225 "Pool deadlock: all resources have been " |
| 225 "allocated for too long.", | 226 "allocated for too long.", |
| 226 _timeout), | 227 _timeout), |
| 227 new Chain.current()); | 228 new Chain.current()); |
| 228 } | 229 } |
| 229 _requestedResources.clear(); | 230 _requestedResources.clear(); |
| 230 _timer = null; | 231 _timer = null; |
| 231 } | 232 } |
| 232 } | 233 } |
| 233 | 234 |
| 234 /// A member of a [Pool]. | 235 /// A member of a [Pool]. |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 266 /// produce additional information later on. For example, an isolate's task | 267 /// produce additional information later on. For example, an isolate's task |
| 267 /// may be complete, but it could still emit asynchronous errors. | 268 /// may be complete, but it could still emit asynchronous errors. |
| 268 void allowRelease(onRelease()) { | 269 void allowRelease(onRelease()) { |
| 269 if (_released) { | 270 if (_released) { |
| 270 throw new StateError("A PoolResource may only be released once."); | 271 throw new StateError("A PoolResource may only be released once."); |
| 271 } | 272 } |
| 272 _released = true; | 273 _released = true; |
| 273 _pool._onResourceReleaseAllowed(onRelease); | 274 _pool._onResourceReleaseAllowed(onRelease); |
| 274 } | 275 } |
| 275 } | 276 } |
| OLD | NEW |