Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(486)

Side by Side Diff: packages/barback/lib/src/graph/phase.dart

Issue 3015713002: Roll to pickup pool changes
Patch Set: Created 3 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library barback.graph.phase; 5 library barback.graph.phase;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 8
9 import 'package:collection/collection.dart'; 9 import 'package:collection/collection.dart';
10 10
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
85 final _streams = new NodeStreams(); 85 final _streams = new NodeStreams();
86 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange; 86 Stream<NodeStatus> get onStatusChange => _streams.onStatusChange;
87 Stream<AssetNode> get onAsset => _streams.onAsset; 87 Stream<AssetNode> get onAsset => _streams.onAsset;
88 Stream<LogEntry> get onLog => _streams.onLog; 88 Stream<LogEntry> get onLog => _streams.onLog;
89 89
90 /// How far along [this] is in processing its assets. 90 /// How far along [this] is in processing its assets.
91 NodeStatus get status { 91 NodeStatus get status {
92 // Before any transformers are added, the phase should be dirty if and only 92 // Before any transformers are added, the phase should be dirty if and only
93 // if any input is dirty. 93 // if any input is dirty.
94 if (_classifiers.isEmpty && _groups.isEmpty && previous == null) { 94 if (_classifiers.isEmpty && _groups.isEmpty && previous == null) {
95 return _inputs.any((input) => input.state.isDirty) ? 95 return _inputs.any((input) => input.state.isDirty)
96 NodeStatus.RUNNING : NodeStatus.IDLE; 96 ? NodeStatus.RUNNING
97 : NodeStatus.IDLE;
97 } 98 }
98 99
99 var classifierStatus = NodeStatus.dirtiest( 100 var classifierStatus = NodeStatus
100 _classifiers.values.map((classifier) => classifier.status)); 101 .dirtiest(_classifiers.values.map((classifier) => classifier.status));
101 var groupStatus = NodeStatus.dirtiest( 102 var groupStatus =
102 _groups.values.map((group) => group.status)); 103 NodeStatus.dirtiest(_groups.values.map((group) => group.status));
103 return (previous == null ? NodeStatus.IDLE : previous.status) 104 return (previous == null ? NodeStatus.IDLE : previous.status)
104 .dirtier(classifierStatus) 105 .dirtier(classifierStatus)
105 .dirtier(groupStatus); 106 .dirtier(groupStatus);
106 } 107 }
107 108
108 /// The previous phase in the cascade, or null if this is the first phase. 109 /// The previous phase in the cascade, or null if this is the first phase.
109 final Phase previous; 110 final Phase previous;
110 111
111 /// The subscription to [previous]'s [onStatusChange] stream. 112 /// The subscription to [previous]'s [onStatusChange] stream.
112 StreamSubscription _previousStatusSubscription; 113 StreamSubscription _previousStatusSubscription;
(...skipping 14 matching lines...) Expand all
127 Set<AssetNode> get availableOutputs { 128 Set<AssetNode> get availableOutputs {
128 return _outputs.values 129 return _outputs.values
129 .map((output) => output.output) 130 .map((output) => output.output)
130 .where((node) => node.state.isAvailable) 131 .where((node) => node.state.isAvailable)
131 .toSet(); 132 .toSet();
132 } 133 }
133 134
134 // TODO(nweiz): Rather than passing the cascade and the phase everywhere, 135 // TODO(nweiz): Rather than passing the cascade and the phase everywhere,
135 // create an interface that just exposes [getInput]. Emit errors via 136 // create an interface that just exposes [getInput]. Emit errors via
136 // [AssetNode]s. 137 // [AssetNode]s.
137 Phase(AssetCascade cascade, String location) 138 Phase(AssetCascade cascade, String location) : this._(cascade, location, 0);
138 : this._(cascade, location, 0);
139 139
140 Phase._(this.cascade, this._location, this._index, [this.previous]) { 140 Phase._(this.cascade, this._location, this._index, [this.previous]) {
141 if (previous != null) { 141 if (previous != null) {
142 _previousOnAssetSubscription = previous.onAsset.listen(addInput); 142 _previousOnAssetSubscription = previous.onAsset.listen(addInput);
143 _previousStatusSubscription = previous.onStatusChange 143 _previousStatusSubscription =
144 .listen((_) => _streams.changeStatus(status)); 144 previous.onStatusChange.listen((_) => _streams.changeStatus(status));
145 } 145 }
146 146
147 onStatusChange.listen((status) { 147 onStatusChange.listen((status) {
148 if (status == NodeStatus.RUNNING) return; 148 if (status == NodeStatus.RUNNING) return;
149 149
150 // All the previous phases have finished declaring or producing their 150 // All the previous phases have finished declaring or producing their
151 // outputs. If anyone's still waiting for outputs, cut off the wait; we 151 // outputs. If anyone's still waiting for outputs, cut off the wait; we
152 // won't be generating them, at least until a source asset changes. 152 // won't be generating them, at least until a source asset changes.
153 for (var completer in _pendingOutputRequests.values) { 153 for (var completer in _pendingOutputRequests.values) {
154 completer.complete(null); 154 completer.complete(null);
155 } 155 }
156 _pendingOutputRequests.clear(); 156 _pendingOutputRequests.clear();
157 }); 157 });
158 } 158 }
159 159
160 /// Adds a new asset as an input for this phase. 160 /// Adds a new asset as an input for this phase.
161 /// 161 ///
162 /// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase 162 /// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase
163 /// will automatically begin determining which transforms can consume it as a 163 /// will automatically begin determining which transforms can consume it as a
164 /// primary input. The transforms themselves won't be applied until [process] 164 /// primary input. The transforms themselves won't be applied until [process]
165 /// is called, however. 165 /// is called, however.
166 /// 166 ///
167 /// This should only be used for brand-new assets or assets that have been 167 /// This should only be used for brand-new assets or assets that have been
168 /// removed and re-created. The phase will automatically handle updated assets 168 /// removed and re-created. The phase will automatically handle updated assets
169 /// using the [AssetNode.onStateChange] stream. 169 /// using the [AssetNode.onStateChange] stream.
170 void addInput(AssetNode node) { 170 void addInput(AssetNode node) {
171 // Each group is one channel along which an asset may be forwarded, as is 171 // Each group is one channel along which an asset may be forwarded, as is
172 // each transformer. 172 // each transformer.
173 var forwarder = new PhaseForwarder( 173 var forwarder =
174 node, _classifiers.length, _groups.length); 174 new PhaseForwarder(node, _classifiers.length, _groups.length);
175 _forwarders[node.id] = forwarder; 175 _forwarders[node.id] = forwarder;
176 forwarder.onAsset.listen(_handleOutputWithoutForwarder); 176 forwarder.onAsset.listen(_handleOutputWithoutForwarder);
177 if (forwarder.output != null) { 177 if (forwarder.output != null) {
178 _handleOutputWithoutForwarder(forwarder.output); 178 _handleOutputWithoutForwarder(forwarder.output);
179 } 179 }
180 180
181 _inputOrigins.add(node.origin); 181 _inputOrigins.add(node.origin);
182 _inputs.add(node); 182 _inputs.add(node);
183 _inputSubscriptions.add(node.onStateChange.listen((state) { 183 _inputSubscriptions.add(node.onStateChange.listen((state) {
184 if (state.isRemoved) { 184 if (state.isRemoved) {
(...skipping 11 matching lines...) Expand all
196 // TODO(nweiz): If the output is available when this is called, it's 196 // TODO(nweiz): If the output is available when this is called, it's
197 // theoretically possible for it to become unavailable between the call and 197 // theoretically possible for it to become unavailable between the call and
198 // the return. If it does so, it won't trigger the rebuilding process. To 198 // the return. If it does so, it won't trigger the rebuilding process. To
199 // avoid this, we should have this and the methods it calls take explicit 199 // avoid this, we should have this and the methods it calls take explicit
200 // callbacks, as in [AssetNode.whenAvailable]. 200 // callbacks, as in [AssetNode.whenAvailable].
201 /// Gets the asset node for an output [id]. 201 /// Gets the asset node for an output [id].
202 /// 202 ///
203 /// If [id] is for a generated or transformed asset, this will wait until it 203 /// If [id] is for a generated or transformed asset, this will wait until it
204 /// has been created and return it. This means that the returned asset will 204 /// has been created and return it. This means that the returned asset will
205 /// always be [AssetState.AVAILABLE]. 205 /// always be [AssetState.AVAILABLE].
206 /// 206 ///
207 /// If the output cannot be found, returns null. 207 /// If the output cannot be found, returns null.
208 Future<AssetNode> getOutput(AssetId id) { 208 Future<AssetNode> getOutput(AssetId id) {
209 return new Future.sync(() { 209 return new Future.sync(() {
210 if (id.package != cascade.package) return cascade.graph.getAssetNode(id); 210 if (id.package != cascade.package) return cascade.graph.getAssetNode(id);
211 if (_outputs.containsKey(id)) { 211 if (_outputs.containsKey(id)) {
212 var output = _outputs[id].output; 212 var output = _outputs[id].output;
213 // If the requested output is available, we can just return it. 213 // If the requested output is available, we can just return it.
214 if (output.state.isAvailable) return output; 214 if (output.state.isAvailable) return output;
215 215
216 // If the requested output exists but isn't yet available, wait to see 216 // If the requested output exists but isn't yet available, wait to see
217 // if it becomes available. If it's removed before becoming available, 217 // if it becomes available. If it's removed before becoming available,
218 // try again, since it could be generated again. 218 // try again, since it could be generated again.
219 output.force(); 219 output.force();
220 return output.whenAvailable((_) { 220 return output.whenAvailable((_) {
221 return output; 221 return output;
222 }).catchError((error) { 222 }).catchError((error) {
223 if (error is! AssetNotFoundException) throw error; 223 if (error is! AssetNotFoundException) throw error;
224 return getOutput(id); 224 return getOutput(id);
225 }); 225 });
226 } 226 }
227 227
228 // If this phase and the previous phases are fully declared or done, the 228 // If this phase and the previous phases are fully declared or done, the
229 // requested output won't be generated and we can safely return null. 229 // requested output won't be generated and we can safely return null.
230 if (status != NodeStatus.RUNNING) return null; 230 if (status != NodeStatus.RUNNING) return null;
231 231
232 // Otherwise, store a completer for the asset node. If it's generated in 232 // Otherwise, store a completer for the asset node. If it's generated in
233 // the future, we'll complete this completer. 233 // the future, we'll complete this completer.
234 var completer = _pendingOutputRequests.putIfAbsent(id, 234 var completer =
235 () => new Completer.sync()); 235 _pendingOutputRequests.putIfAbsent(id, () => new Completer.sync());
236 return completer.future; 236 return completer.future;
237 }); 237 });
238 } 238 }
239 239
240 /// Set this phase's transformers to [transformers]. 240 /// Set this phase's transformers to [transformers].
241 void updateTransformers(Iterable transformers) { 241 void updateTransformers(Iterable transformers) {
242 var newTransformers = transformers 242 var newTransformers = transformers
243 .where((op) => op is Transformer || op is AggregateTransformer) 243 .where((op) => op is Transformer || op is AggregateTransformer)
244 .toSet(); 244 .toSet();
245 var oldTransformers = _classifiers.keys.toSet(); 245 var oldTransformers = _classifiers.keys.toSet();
246 for (var removed in oldTransformers.difference(newTransformers)) { 246 for (var removed in oldTransformers.difference(newTransformers)) {
247 _classifiers.remove(removed).remove(); 247 _classifiers.remove(removed).remove();
248 } 248 }
249 249
250 for (var transformer in newTransformers.difference(oldTransformers)) { 250 for (var transformer in newTransformers.difference(oldTransformers)) {
251 var classifier = new TransformerClassifier( 251 var classifier =
252 this, transformer, "$_location.$_index"); 252 new TransformerClassifier(this, transformer, "$_location.$_index");
253 _classifiers[transformer] = classifier; 253 _classifiers[transformer] = classifier;
254 classifier.onAsset.listen(_handleOutput); 254 classifier.onAsset.listen(_handleOutput);
255 _streams.onLogPool.add(classifier.onLog); 255 _streams.onLogPool.add(classifier.onLog);
256 classifier.onStatusChange.listen((_) => _streams.changeStatus(status)); 256 classifier.onStatusChange.listen((_) => _streams.changeStatus(status));
257 for (var input in _inputs) { 257 for (var input in _inputs) {
258 classifier.addInput(input); 258 classifier.addInput(input);
259 } 259 }
260 } 260 }
261 261
262 var newGroups = DelegatingSet.typed/*<TransformerGroup>*/( 262 var newGroups = DelegatingSet.typed<TransformerGroup>(
263 transformers.where((op) => op is TransformerGroup).toSet()); 263 transformers.where((op) => op is TransformerGroup).toSet());
264 var oldGroups = _groups.keys.toSet(); 264 var oldGroups = _groups.keys.toSet();
265 for (var removed in oldGroups.difference(newGroups)) { 265 for (var removed in oldGroups.difference(newGroups)) {
266 _groups.remove(removed).remove(); 266 _groups.remove(removed).remove();
267 } 267 }
268 268
269 for (var added in newGroups.difference(oldGroups)) { 269 for (var added in newGroups.difference(oldGroups)) {
270 var runner = new GroupRunner(previous, added, "$_location.$_index"); 270 var runner = new GroupRunner(previous, added, "$_location.$_index");
271 _groups[added] = runner; 271 _groups[added] = runner;
272 runner.onAsset.listen(_handleOutput); 272 runner.onAsset.listen(_handleOutput);
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
347 } 347 }
348 } 348 }
349 349
350 /// Add [asset] as an output of this phase without checking if it's a 350 /// Add [asset] as an output of this phase without checking if it's a
351 /// forwarded asset. 351 /// forwarded asset.
352 void _handleOutputWithoutForwarder(AssetNode asset) { 352 void _handleOutputWithoutForwarder(AssetNode asset) {
353 if (_outputs.containsKey(asset.id)) { 353 if (_outputs.containsKey(asset.id)) {
354 _outputs[asset.id].add(asset); 354 _outputs[asset.id].add(asset);
355 } else { 355 } else {
356 _outputs[asset.id] = new PhaseOutput(this, asset, "$_location.$_index"); 356 _outputs[asset.id] = new PhaseOutput(this, asset, "$_location.$_index");
357 _outputs[asset.id].onAsset.listen(_emit, 357 _outputs[asset.id]
358 onDone: () => _outputs.remove(asset.id)); 358 .onAsset
359 .listen(_emit, onDone: () => _outputs.remove(asset.id));
359 _emit(_outputs[asset.id].output); 360 _emit(_outputs[asset.id].output);
360 } 361 }
361 362
362 var exception = _outputs[asset.id].collisionException; 363 var exception = _outputs[asset.id].collisionException;
363 if (exception != null) cascade.reportError(exception); 364 if (exception != null) cascade.reportError(exception);
364 } 365 }
365 366
366 /// Emit [asset] as an output of this phase. 367 /// Emit [asset] as an output of this phase.
367 /// 368 ///
368 /// This should be called after [_handleOutput], so that collisions are 369 /// This should be called after [_handleOutput], so that collisions are
(...skipping 11 matching lines...) Expand all
380 381
381 if (asset.state.isAvailable) { 382 if (asset.state.isAvailable) {
382 request.complete(asset); 383 request.complete(asset);
383 return; 384 return;
384 } 385 }
385 386
386 // A lazy asset may be emitted while still dirty. If so, we wait until it's 387 // A lazy asset may be emitted while still dirty. If so, we wait until it's
387 // either available or removed before trying again to access it. 388 // either available or removed before trying again to access it.
388 assert(asset.state.isDirty); 389 assert(asset.state.isDirty);
389 asset.force(); 390 asset.force();
390 asset.whenStateChanges().then((state) { 391 asset
391 if (state.isRemoved) return getOutput(asset.id); 392 .whenStateChanges()
392 return asset; 393 .then((state) {
393 }) 394 if (state.isRemoved) return getOutput(asset.id);
394 .then((asset) => request.complete(asset)) 395 return asset;
395 .catchError(request.completeError); 396 })
397 .then((asset) => request.complete(asset))
398 .catchError(request.completeError);
396 } 399 }
397 400
398 String toString() => "phase $_location.$_index"; 401 String toString() => "phase $_location.$_index";
399 } 402 }
OLDNEW
« no previous file with comments | « packages/barback/lib/src/graph/package_graph.dart ('k') | packages/barback/lib/src/graph/phase_output.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698