In diese Platine werden nun Scatter- und Gather-Bausteine eingebracht, um das Auslesen der Stichwörter zu parallelisieren, siehe Listing 10. Dazu wird der Baustein Alle_Stichwörter_ermitteln zweimal instanziert. Der Baustein erwartet eine Liste von Dateinamen am Eingang und liefert dazu eine Liste aller gefundenen Stichwörter am Ausgang. Das Ermitteln der Dateinamen wird vom Baustein Dateinamen_suchen übernommen. Für die Parallelisierung der Aufgabe wird daher der Ausgang von Dateinamen_suchen mit dem Eingang eines Scatter-Bausteins verbunden. Dieser sorgt dafür, dass die Eingangsdaten auf die beiden Ausgänge verteilt werden. An die beiden Ausgänge des Scatter-Bausteins werden daher die beiden Alle_Stichwörter_ermitteln-Bausteine gehängt. Um die Ergebnisse wieder zusammenzuführen, wird jeweils der Ausgang von Alle_Stichwörter_ermitteln mit einem Eingang eines Gather-Bausteins verbunden. Dieser sammelt die Ergebnisse und stellt sie an seinem Ausgang zur Verfügung. Abbildung 1 zeigt den Flow vor, Abbildung 2 nach der Parallelisierung.
[Abb. 1]
Vor der Parallelisierung ...
[Abb. 2]
... und nach der Parallelisierung.
Listing 10
Stichwörter ermitteln, parallel.
public class Eindeutige_Stichwörter_ermitteln_parallel { private readonly Action<Tuple<string, string>> process; public Eindeutige_Stichwörter_ ermitteln_parallel() { var dateinamen_suchen = new Dateinamen_suchen(); var alle_Stichwörter_ermitteln1 = new Alle_Stichwörter_ermitteln(); var alle_Stichwörter_ermitteln2 = new Alle_Stichwörter_ermitteln(); var eindeutige_Stichwörter_filtern = new Eindeutige_Stichwörter_filtern(); var scatter = new Scatter<string>(); var gather = new Gather<string>(); dateinamen_suchen.Result += scatter.Process; scatter.Output1 += alle_Stichwörter_ ermitteln1.Process; scatter.Output2 += alle_Stichwörter_ ermitteln2.Process; alle_Stichwörter_ermitteln1.Result += gather.Input1; alle_Stichwörter_ermitteln2.Result += gather.Input2; gather.Result += eindeutige_Stichwörter_ filtern.Process; eindeutige_Stichwörter_filtern.Result += x => Result(x); process = path_und_SearchPattern => dateinamen_suchen.Process( path_und_SearchPattern); } public void Process(Tuple<string, string> path_und_SearchPattern) { process(path_und_SearchPattern); } public event Action<IEnumerable< string>> Result; }
Zu beachten ist, dass der Flow, der durch die Platine realisiert wird, durch die Verwendung von Scatter und Gather asynchron abläuft. Nach Aufruf der Process-Methode der Platine kehrt der Kontrollfluss zum Aufrufer zurück, während der Flow der Platine asynchron auf weiteren Threads noch läuft. Auch der Result- Event der Platine wird asynchron auf einem anderen Thread ausgelöst.
Die Parallelisierung der Stichwortsuche zeigt deutliche Geschwindigkeitsvorteile. Das ist natürlich nicht in jedem Fall so. Ein triviales Beispiel, bei dem einfach nur Zahlen multipliziert werden, wird durch die Parallelisierung nicht beschleunigt. Im Gegenteil! Durch den Overhead des Multithreadings wird der Durchsatz sogar verringert.
Realisierung
Die Grundidee für den Scatter-Baustein besteht darin, ein und denselben Enumerator in den beiden Output-Events zu verwenden. Und natürlich werden die beiden Output-Events auf je einem eigenen Thread ausgelöst.
Normalerweise wird beim Iterieren mit dem foreach-Sprachkonstrukt gearbeitet:
var ints = new[] { 1, 2, 3, 4 }; foreach(var i in ints) {...}
Hinter den Kulissen wird dies vom C#- Compiler in Aufrufe übersetzt, die in IEnumerable und IEnumerator definiert sind:
var enumerator = ints.GetEnumerator(); while(enumerator.MoveNext()) { var i = enumerator.Current; // Mache etwas mit i }
Wichtig ist hier festzuhalten, dass für die Schleife mit GetEnumerator eine neue Instanz eines Enumerators erzeugt wird. Jeder Aufruf von GetEnumerator liefert eine neue Instanz eines Enumerators! Insofern wäre es beim Scatter-Baustein schwierig, in jedem der beiden Ausgangsevents mit foreach über die Eingangsdaten zu iterieren, weil dann zwei Enumeratoren im Spiel wären. Beide würden jeweils von vorn nach hinten über die Eingangsdaten iterieren. Jedes Element der Eingangsdaten soll aber nur genau einmal an einem der beiden Ausgänge anstehen. Das wird erreicht, indem beide Ausgänge dieselbe Instanz des Enumerators verwenden, siehe Listing 11.
Listing 11
Der Scatter-Baustein.
public class Scatter<T> { public void Process(IEnumerable<T> input) { var enumerator = input.GetEnumerator(); var thread1 = new Thread(() => Output1( GenerateOutput(enumerator))); var thread2 = new Thread(() => Output2( GenerateOutput(enumerator))); thread1.Start(); thread2.Start(); } private IEnumerable<T> GenerateOutput( IEnumerator<T> enumerator) { while (enumerator.MoveNext()) { yield return enumerator.Current; } } public event Action<IEnumerable<T>> Output1; public event Action<IEnumerable<T>> Output2; }
Der Enumerator wird einmal mit Get- Enumerator instanziert und dann beide Male an die Methode GenerateOutput übergeben. Diese iteriert mithilfe des Enumerators über die Eingangsdaten und liefert sie mit yield return als neue Aufzählung zurück. Auf diese Weise werden die einzelnen Elemente der Eingangsdaten nur jeweils einmal an einen der beiden Ausgänge verteilt.
Listing 12 zeigt die Tests für den Scatter- Baustein.
Listing 12
Scatter testen.
[TestFixture] public class ScatterTests { private Scatter<int> sut; private IEnumerable<int> result1; private IEnumerable<int> result2; [SetUp] public void Setup() { sut = new Scatter<int>(); sut.Output1 += x => result1 = x; sut.Output2 += x => result2 = x; } [Test] public void Jedes_Element_steht_genau_ einmal_an_einem_der_Ausgänge() { sut.Process(new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); Assert.That(result1.Union(result2), Is.EquivalentTo(new[] {1,2,3,4,5,6,7,8,9,10})); } [Test] public void Die_beiden_Ausgänge_laufen_ nicht_auf_dem_Haupthread() { var mainThread = Thread.CurrentThread; sut.Output1 += _ => Assert.That(Thread.CurrentThread, Is.Not.SameAs(mainThread)); sut.Output2 += _ => Assert.That(Thread.CurrentThread, Is.Not.SameAs(mainThread)); sut.Process(new[]{1}); } [Test] public void Die_beiden_Ausgänge_laufen_ auf_unterschiedlichen_Threads() { Thread thread1 = null; Thread thread2 = null; sut.Output1 += _ => thread1 = Thread.CurrentThread; sut.Output2 += _ => thread2 = Thread.CurrentThread; sut.Process(new[]{1}); Assert.That(thread1, Is.Not.SameAs(thread2)); } }
Der erste Test überprüft, ob die beiden Aufzählungen, die an den Ausgängen des Scatter-Bausteins gebildet werden, zusammengenommen der Aufzählung des Eingangs entsprechen. Da es durch die Ausführung auf mehreren Threads zu Veränderungen der Reihenfolge kommen kann, erfolgt die überprüfung mit Is.EquivalentTo. Die beiden anderen Tests prüfen, ob die Ausgänge tatsächlich auf einem anderen als dem Mainthread ablaufen und ob wirklich beide Ausgänge einen eigenen Thread erhalten.
Gather
Aufgabe des Gather-Bausteins ist es, die Aufzählungen, die an den beiden Eingängen anstehen, zu einer Aufzählung für den Ausgang zusammenzufassen. Dabei wird der Ausgang auf einem eigenen Thread ausgeführt, damit der Baustein