Kommunikationsmuster
Beim verteilten Rechnen werden häufig wiederkehrende Muster verwendet, um Daten zwischen den teilnehmenden Prozessen auszutauschen und zu synchronisieren. Im Folgenden werden einige solche Kommunikationsmuster1 erläutert, die die Grundlage für die Implementierung verteilten maschinellen Lernens bilden (Nielsen 2016). Für die Analyse und das Verständnis der Prozesse im verteilten maschinellen Lernens sind diese Muster unabdingbar.
Beschreibung der Kommunikationsmuster
Broadcast
Beim Broadcast sendet ein Prozess seine Daten an alle anderen Prozesse, welche jeweils ihre lokalen Daten mit den erhaltenen Werten überschreiben. Nach der Operation hat also jeder Prozess die Daten des sendenden Prozesses im Speicher.
Beispiel einer Broadcast-Operation: Prozess 1 sendet sein Datum $x_1$ an alle anderen Prozesse, welche jeweils ihre eigenen Daten mit diesem Datum $x_1$ überschreiben.
Reduce
Bei einer Reduce-Operation senden alle teilnehmenden Prozesse ihre Daten an einen bestimmten Zielprozess, welcher anschließend die Daten aller Prozesse anhand einer Reduktionsoperation auf jeweils einen Wert reduziert. Häufig verwendete Reduktionsoperationen sind Addition und Maximum. Nach der Reduktion erhält der Zielprozesses das Ergebnis der Reduktion der Daten aller Prozesse, wohingegen der Speicherinhalt aller anderen Prozesse unverändert bleibt.
Beispiel einer Reduce-Operation: Alle Prozesse senden ihre Daten an Prozess 1, welcher die Summe aller erhaltenen Daten berechnet. Der ursprüngliche Speicherinhalt von Prozess 1 wird mit der Summe überschrieben. Die Speicherinhalte der anderen Prozesse bleiben unverändert.
All-Reduce
Beim All-Reduce werden ähnlich wie bei der Reduce-Operation die Daten der teilnehmenden Prozesse auf jeweils ein Datum reduziert, allerdings erhalten alle Prozesse das Ergebnis.
Beispiel einer All-Reduce-Operation: Alle Prozesse tauschen untereinander ihre Daten aus und summieren sie auf. Jedes Prozesses erhält anschließend das gleiche Ergebnis.
Barrier
Eine Barrier (Barriere) ist ein Mechanismus zur zeitlichen Synchronisation von Prozessen. Beim Erreichen einer Barriere wird die Ausführung des Programms so lange pausiert, bis alle anderen Prozesse ebenfalls dieselbe Barriere erreicht haben.
Gather
Vor der Operation hat jeder Prozess jeweils ein Datum $x_i$ im Speicher. Im Zuge einer Gather-Operation werden die Daten aller $n$ Prozesse bei einem bestimmten Zielprozess gesammelt. Anschließend verfügt dieser Zielprozess über die Daten aller Prozesse $(x_1, …, x_n)$.
Beispiel einer Gather-Operation: Prozess 1 sammelt die Daten aller $n$ Prozesse und legt sie in einer Liste ab. Der Speicherinhalt der anderen Prozesse bleibt unverändert.
All-Gather
Bei einer All-Gather-Operation werden die Daten $x_i$ aller $n$ Prozesse auf allen Prozessen gesammelt. Dadurch erhalten alle teilnehmenden Prozesse jeweils Zugriff auf die Daten aller Prozesse in Form einer Liste $(x_1, …, x_n)$.
Beispiel einer All-Gather-Operation: Jeder Prozess erhält jeweils eine Kopie der Daten aller anderen Prozesse.
Scatter
Im Zuge der Scatter-Operation verteilt ein Prozess die Daten $(x_1, …, x_n)$ so auf alle $n$ teilnehmenden Prozesse (einschließlich des Senders), dass jeder Prozess exakt ein Datum aus der Liste erhält. Konkret erhält der $i$-te Prozess den $i$-ten Wert $x_i$. Scatter ist damit die Umkehrung der Gather-Operation.
Beispiel einer Scatter-Operation: Die Werte in einer Liste $(x_1, …, x_n)$ in Prozess 1 werden gleichmäßig auf alle Prozesse verteilt. Nach der Operation hat jeder der Prozesse jeweils einen Wert aus der Liste erhalten.
All-to-All
Im Vorfeld der All-to-All-Operation haben alle $n$ Prozesse jeweils eine Liste mit $n$ Elementen. Alle Prozesse verteilen ihre Daten im Zuge der All-to-All-Operation jeweils auf alle anderen Prozesse (siehe Scatter). Jene Prozesse sammeln wiederum alle empfangenen Werte und speichern sie in einer Liste der Größe $n$ ab (siehe Gather). Diese Operation ist vergleichbar mit dem Transponieren einer zweidimensionalen Matrix, deren Spalten auf mehrere Prozesse verteilt sind.
Beispiel einer All-to-All-Operation: Jeder Prozess verteilt seine lokalen Daten so auf alle anderen Prozesse, dass jeder Prozess $i$ das $i$-te Datum erhält. Gleichzeitig sammeln alle Prozesse die erhaltenen Daten in einer Liste. Beispielsweise sind in der Liste von Prozess 1 nach der Operation die ursprünglich ersten Werte aller Prozesse gespeichert.
Zusammenfassung
- Broadcast: Ein bestimmter Prozess sendet seine Daten an alle anderen Prozesse.
- Reduce: Die Daten aller Prozesse werden in einem bestimmten Prozess zusammengefasst.
- All-Reduce: Die Daten aller Prozesse werden in allen Prozessen zusammengefasst.
- Barrier: Aller Prozesse pausieren solange bis alle Prozesse die Barriere erreicht haben.
- Gather: Die einzelnen Werte aller Prozesse werden in einem bestimmten Prozess gesammelt.
- All-Gather: Die einzelnen Werte aller Prozesse werden in allen Prozessen gesammelt.
- Scatter: Die Daten eines Prozesses werden gleichmäßig auf alle Prozesse verteilt.
- All-to-All: Die Daten aller Prozesse werden gleichmäßig auf alle Prozesse verteilt, während gleichzeitig jeder Prozess die erhaltenen Daten in einer Liste sammelt.
PyTorch Distributed
Da die oben beschriebenen Kommunikationsmuster die Grundlage für die Implementierung verteilter KI-Architekturen bilden, wurden sie in Form des Pakets PyTorch Distributed in die im maschinellen Lernen beliebte Bibliothek PyTorch integriert. In diesem Kapitel erläutere ich die Verwendung dieser Muster mittels der von PyTorch bereitgestellten Funktionen.
PyTorch Distributed bildet das Fundament für wichtige Bausteine des parallelen Lernens in PyTorch, wie PyTorch DDP (Li et al. 2020) und PyTorch FSDP (Zhao et al. 2023). (Li 2024) erläutert detailliert die Implementierung datenparallelen Trainings-Aufbaus mit PyTorch. Die offizielle PyTorch-Dokumentation bietet zudem Detailinformationen zu den bereitgestellten Funktionen.
Die hier beschriebenen Kommunikationsmuster sind relativ low-level, das heißt ein ML-Ingenieur wird bei der Parallelisierung eines Modells mit PyTorch normalerweise nicht die oben beschriebenen Methoden verwenden, sondern kann auf abstraktere Werkzeuge wie PyTorch DDP oder FSDP zurückgreifen.
Beim Start der verteilten Anwendung muss PyTorch Distributed zunächst initialisiert werden. Dabei wird eine Gruppe erstellt, die alle an der Berechnung teilnehmenden Prozesse umfasst – die sog. “default group”. Die Initialisierung erfolgt mittels der Methode torch.distributed.init_process_group
, welche die Konfiguration standardmäßig aus Umgebungsvariablen ausliest. Die wichtigsten Umgebungsvariablen hierbei sind:
RANK
: der globale Rang des aktuellen Prozesses, vergleichbar mit einer globalen Prozess-ID,LOCAL_RANK
: der lokale Rang des aktuellen Prozesses auf dem jeweiligen Rechner, vergleichbar mit einer rechnerlokalen Prozess-ID,WORLD_SIZE
: die Anzahl aller teilnehmenden Prozesse,MASTER_ADDR
: die Adresse des Hauptprozesses, der die Verwaltung der Prozesse übernimmt, undMASTER_PORT
: der zugehörige Port des Hauptprozesses.
Alternativ können diese Daten auch als Argumente an torch.distributed.init_process_group
übermittelt werden.
|
|
Im Folgenden verwende ich für eine kompaktere Darstellung konsequent die Abkürzung dist
für torch.distributed
. PyTorch unterstützt mehrere Backends zur Inter-Prozess-Kommunikation (IPC) wie MPI, GLOO oder NCCL. In diesem Beispiel verwende ich GLOO als Backend, welches sich für das Testen am lokalen PC eignet. Die NVIDIA Collective Communications Library (NCCL) erfordert mindestens eine NVidia-GPU pro Prozess und für die Nutzung von MPI muss PyTorch zunächst mit MPI-Unterstützung kompiliert werden. Details zu den Backends kann man in diesem Tutorial nachlesen.
Für eine kompaktere Darstellung verwende ich im Folgenden die Methode create_data
, welche einen Tensor mit Daten in Abhängigkeit des aktuellen Rangs initialisiert und ggf. auf eine GPU transferiert. Bei der Verwendung von GPUs mittels CUDA muss beachtet werden, dass zwei miteinander über NCCL kommunizierende Prozesse auch unterschiedliche GPUs verwenden müssen.
|
|
Die Methode dist.broadcast
initiiert einen Broadcast der übergebenen Daten vom Prozess src
aus auf alle anderen Prozesse. src
bestimmt den Rang des sendenden Prozesses, in diesem Beispiel Prozess 0. Jeder Prozess alloziert zunächst einen Tensor data
gleicher Größe. Der sendende Prozess wird den Inhalt dieses Tensors an alle anderen Prozesse senden und die empfangenden Prozesse werden dessen Inhalt mit den erhaltenen Daten überschreiben. Nach Abschluss der Operation hat data
in allen Prozessen den gleichen Inhalt.
|
|
Eine Reduktion findet mittels der Methode dist.reduce
statt. Wie bei der Broadcast-Methode haben alle Prozesse bereits einen Tensor data
alloziert, dessen Inhalt sie an Prozess 0 schicken. Der Zielprozess wird mittels des Arguments dst
angegeben. Mittels des Arguments op
lässt sich die Form die Reduktionsoperation bestimmen.
|
|
Ein All-Reduce wird analog mittels der Methode dist.all_reduce
durchgeführt, doch entfällt hier das Argument dst
.
|
|
Zum Sammeln von Daten in einem Zielprozess ist die Methode dist.gather
vorgesehen. Diese nimmt als Eingabe wiederum einen bereits allozierten Tensor data
, den Rang des Zielprozesses dst
, sowie eine Liste von bereits in passender Größe allozierten Tensoren gather_list
, in welche die von den anderen Prozessen erhaltenen Tensoren gespeichert werden. gather_list
ist nur verpflichtend für den Zielprozess, alle anderen Prozesse brauchen dieses Argument nicht angeben.
|
|
Mittels dist.all_gather
wird analog zu dist.gather
eine All-Gather-Operation durchgeführt. Es entfällt wieder das Argument dst
.
|
|
Eine Scatter-Operation wird mittels dist.scatter
durchgeführt. Hierbei übergibt der sendende Prozess, hier 0, der Methode eine Liste mit jeweils einen Tensor für jeden Prozess. Dabei wird der Prozess mit dem Rang $i$ den Tensor data[i]
erhalten. Zunächst muss jeder Prozess einen Tensor passender Größe allozieren, hier result
, in den die empfangenen Daten geschrieben werden. Die Angabe des zweiten Argumentes ist nur für den sendenden Prozess notwendig.
|
|
dist.all_to_all
funktioniert analog zur Scatter-Operation, wobei allerdings jeder der $n$ Prozesse der Methode eine Liste der Länge $n$ mit zu sendenden Tensoren übergibt. Leider unterstützen nicht alle Backends diese Operation.
|
|
Die Methode dist.barrier()
blockiert den ausführenden Prozess solange, bis alle anderen Prozesse an derselben Stelle im Quellcode angelangt sind.
|
|
Standardmäßig kommunizieren die oben beschriebenen Methoden mit allen anderen Prozessen. Allerdings ist es nicht immer notwendig, alle Prozesse in die Kommunikation mit einzubeziehen. Mittels Angabe einer Gruppe von Prozessen über das Argument group
kann die Anzahl der durch eine Operation angesprochenen Prozesse eingeschränkt werden. Dazu muss zunächst eine Gruppe mit Hilfe von dist.new_group()
definiert werden. Dies erfordert allerdings die Beteiligung aller Prozesse, denn die Methode dist.new_group()
wird solange die Ausführung pausieren, bis alle Prozesse an dieser Stelle im Code angelangt sind. Der folgende Code erzeugt eine Gruppe mit allen Prozessen, deren Ränge gerade Zahlen sind:
|
|
Anschließend kann diese Gruppe verwendet werden, um eine lokale Synchronisation durchzuführen:
|
|
Hierbei ist natürlich nicht mehr die Beteiligung aller Prozesse erforderlich, sondern nur noch der Prozesse in der verwendeten Gruppe.
Ein Skript, welches PyTorch Distributed verwendet, wird üblicherweise mit torchrun
oder python -m torch.distributed.launch
(veraltet) gestartet. Diese Befehle ermöglichen es, gleich mehrere Prozesse auf einem Rechner oder mehreren Rechnern zu starten, wodurch die manuelle Erstellung von Prozessen entfällt. Zudem bieten sie Kommandozeilenoptionen an, um die nötigen Umgebungsvariablen zu konfigurieren. So wird beispielsweise mit --nnodes
die Anzahl der teilnehmenden Rechner bestimmt, --nproc-per-node
gibt die Anzahl der Prozesse pro Rechner an und --master-addr
dient der Angabe der Hauptprozessadresse (z.B. IP-Adresse oder Domain-Name).
Zur Demonstration der Konzepte habe ich dieses Skript geschrieben. Es führt die oben angesprochenen Methoden mit Beispieldaten hintereinander aus und protokolliert ausführlich die ausgeführten Operationen sowie die Ein- und Ausgabedaten. Ich habe es auf einem Laptop mit Ubuntu 23.10, Python 3.11 und PyTorch 2.0.1 getestet. Es kann einfach mittels
|
|
gestartet werden.
Referenzen
- Li, S., 2024. PyTorch Distributed Overview. Available at: https://pytorch.org/tutorials/beginner/dist_overview.html [Accessed March 8, 2024].
- Li, S. et al., 2020. PyTorch Distributed: Experiences on Accelerating Data Parallel Training. Proceedings of the VLDB Endowment, 13(12), pp.3005–3018. Available at: https://dl.acm.org/doi/10.14778/3415478.3415530 [Accessed June 19, 2023].
- Nielsen, F., 2016. Introduction to HPC with MPI for Data Science, Cham: Springer International Publishing. Available at: http://link.springer.com/10.1007/978-3-319-21903-5 [Accessed January 28, 2024].
- Zhao, Y. et al., 2023. PyTorch FSDP: Experiences on Scaling Fully Sharded Data Parallel. Available at: http://arxiv.org/abs/2304.11277 [Accessed April 28, 2023].