Large Message Handling mit Kafka: Chunking vs. External Store

Von Adam Kotwasinski, Senior Software Development Engineer, Workday

Hintergrund

Mehrere Dienste in Workday verwenden Kafka als Messaging-Bus (https://kafka.apache.org/uses) und übertragen verschiedene Arten von Daten. Leider legt Kafka ein Limit für die Größe der Payload fest, die an den Broker gesendet werden kann (im Vergleich zu RabbitMQ, das kein solches Limit hat). Durch Aufheben des Grenzwerts kann der Business-Layer-Code das Geschäftsproblem lösen, ohne dass Kafka-spezifische Verhaltensweisen in den Anwendungscode gelangen.

Wenn die Nachricht größer ist als der vom Broker akzeptierte Wert, gibt der Kafka-Produzent diese Ausnahme zurück:

Implementierung 1 - Chunking

Große Nutzlasten können in mehrere kleinere Blöcke aufgeteilt werden, die von Brokern akzeptiert werden können.

Die Chunks können in Kafka genauso gespeichert werden wie normale (nicht-chunked) Nachrichten. Der einzige Unterschied besteht darin, dass der Verbraucher die Blöcke behalten und zu einer echten Nachricht zusammenfassen muss, wenn alle Blöcke gesammelt wurden.

Die Blöcke im Kafka-Protokoll können mit normalen Nachrichten verwoben werden.

Für eine Nachricht, die in drei Teile aufgeteilt ist, könnte der Konsument beispielsweise bereits einen der Teile konsumiert haben:

Nachdem der Verbraucher weitere Nachrichten erhalten hat, können wir sehen, dass der zweite Block empfangen wurde:

Nachdem der dritte und letzte Block empfangen wurde, kann die Nachricht schließlich kombiniert und an den Endbenutzer zurückgegeben werden, und der Cache kann aufgeräumt werden:

Die Blockgröße ergibt sich aus der Kafka-Herstellerkonfiguration (max.request.size).

Jeder der Chunks wird zusammen mit diesen Metadaten gesendet:

  • Nachricht uuid
  • Die Position des Blocks in der Nutzlast
  • Die Gesamtzahl der Chunks

Chunk-Payload-Format

Nachrichten senden

Die Hauptklassen für diese Funktionalität sind Message Producer und Message Chunker.

Der Code, der die Chunking-Nutzdaten (zusammen mit den obigen Headern) generiert, ist relativ einfach. In Anbetracht der Nutzlast und der maximal zulässigen Größe teilen wir diese in ungefähr payload.size / chunkSize Anzahl von Chunks auf:

Der von den Sendemethoden zurückgegebene Offset muss auf den niedrigsten Offset verweisen, der allen Chunks zugewiesen ist. Jedes andere Ergebnis würde es unmöglich machen, nach dem ersten Teil zu suchen:

Mit der Kafka Producer-API kann der Benutzer die Partition der Nachricht anhand des Nachrichtenschlüssels berechnen. Das Ergebnis basiert auf der Anzahl der Partitionen, die derzeit im Cluster gehostet werden (DefaultPartitioner in Kafka 1.0). Im Moment sendet die Implementierung die Chunks an Partition 0, wenn die Partition nicht angegeben wurde. In zukünftigen Versionen möchten wir es möglicherweise ändern, indem wir die Partitionen lesen, die von der ersten Chunksendeoperation zurückgegeben wurden, und andere Chunks an dieselbe Partition senden.

Andernfalls könnte es vorkommen, dass Konsumenten, die von einer einzelnen Partition Gebrauch machen, niemals eine vollständige Nachricht erhalten, da die Chunks auf mehrere Partitionen verteilt sind:

Nachrichten empfangen

Die Hauptklassen für diese Funktionalität sind abstract consumer und message chunker.

Wenn eine Kafka-Nachricht, die einen Block enthält, empfangen wird, wird sie lokal gespeichert und nicht an den Benutzer zurückgegeben (da es keinen Vorteil hätte, nur einen Teil der Nutzlast abzurufen). Erst wenn alle Chunks gesammelt wurden, werden sie zu einer einzigen Nachricht zusammengefasst und an den Benutzer zurückgegeben.

Das Bereinigen dieses Speichers kann nach Offset-Änderungsvorgängen erforderlich sein, da das Suchen nach einer Position zwischen Abschnitten eine vollständige Nachricht hätte zurückgeben können, wenn der Benutzer beabsichtigt hätte, zu einem späteren Punkt (nach dem Kopf der Nachricht) zu suchen. Beispiel: Für eine Sechs-Chunk-Nachricht haben wir bereits die Chunks 1, 2 und 3 erhalten. Nachdem wir erneut versucht haben, drei zu positionieren, haben wir die Chunks 3 (erneut), 4, 5 und 6 (die neuen) verbraucht. Dies bedeutet, dass alle Chunks empfangen wurden, während die Chunks 1 und 2 vor dem Suchvorgang empfangen wurden und dem Benutzer nicht zur Verfügung gestellt werden sollten. In den vorhergehenden Diagrammen wäre der zu suchende Versatz "N + 1".

Derzeit ist der Chunk-Store als Java-In-Memory-Map implementiert.

Das Gruppenmanagement ist eine Herausforderung für Block-basierte Lösungen. Es ist möglich, dass sich eine Verbrauchergruppe neu zusammensetzt, während einige der Teile bereits vom alten Verbraucher erhalten wurden. Wir würden also damit enden, dass ein (alter) Verbraucher anfängliche Teile speichert und der andere (neue) Verbraucher die verbleibenden Teile erhält. Mögliche Lösungen hierfür sind:

  • Alternative 1 - neuer Verbraucher - sucht bei dem Neuausgleichsereignis, wenn der Teil, den wir erhalten haben, nicht der erste war.
  • Alternative 2 - alter Verbraucher - Starten eines parallelen Verbrauchers ohne Gruppenzuordnung, der für das Abrufen der verbleibenden Brocken verantwortlich ist (Art „Endbearbeitungsmechanismus“).
  • Alternative 3 - das Teilen von Chunks zwischen Konsumenten - wäre eine verteilte Cache-Lösung und ein Abenteuer für sich.

Implementierung 2 - Externer Speicher

Anstatt eine große Nutzlast über Kafka zu senden, könnten wir einen anderen Ansatz wählen: Speichern Sie die reale Nutzlast in einem externen Datenspeicher und übertragen Sie nur den Zeiger auf diese Daten. Der Empfänger würde dann diese Art von Zeigernutzlast erkennen, die Daten aus dem externen Speicher transparent lesen und sie dem Endbenutzer bereitstellen.

Pointer Envelope Payload Format

Generisches IoC

Der sendende Teil ist dann dafür verantwortlich, die Nutzdaten zu speichern und den Zeiger über Kafka zu senden:

  • Payload in externem Speicher speichern (über die vom Provider implementierte Write-Methode)
  • Generieren Sie den Umschlag, der die Zeiger-Nutzdaten umschließt, und verwenden Sie dabei dieselben Kafka-Metadaten
    als Original (Thema, Partition, Schlüssel)
  • Senden Sie es über Kafka

Kurz gesagt, die Nachrichten in Kafka beziehen sich auf die Einträge im externen Geschäft:

Der externe Speicher muss drei Methoden implementieren: Schreiben, Lesen und Zurücksetzen (bei fehlgeschlagenen Transaktionen):

Die Hauptklasse für diese Funktionalität ist Message Chunker:

Der empfangende Teil wäre dann für die Wiederherstellung der Nutzdaten verantwortlich:

  • Überprüfen, ob die empfangene Nachricht wirklich eine Zeiger-Nutzlast ist (durch Überprüfen des Nutzlast-Headers und der Nutzlast-Version).
  • Extrahieren der realen Zeigernutzlast (generiert vom externen Speicher).
  • Übergabe der Zeiger-Payload an den externen Speicher über die Methode "read", um die tatsächliche Payload bereitzustellen.
  • Rückgabe des Ergebnisses an den Benutzer mit denselben Metadaten (Thema, Partitionen, Offset, Schlüssel) wie die ursprünglich empfangene Nachricht.

Bei Kafka-Send-Fehlern muss der externe Speicher zurückgesetzt werden. Die vom Speicher während des Schreibvorgangs generierten Nutzdaten werden als Argument für die "Rollback" -Methode bereitgestellt. Es liegt dann in der Verantwortung des externen Geschäfts, die erforderlichen Aufräumarbeiten durchzuführen:

kafka-over-redis

In dieser Implementierung haben wir Redis als externen Datenspeicher verwendet, es können jedoch auch andere Arten von persistentem Speicher (wie RDBMS) verwendet werden. Die Implementierung ist als redis-backed external store vorhanden.

Um Nutzdaten eindeutig zu erkennen, basiert jeder Redis-Schlüssel auf der UUID der Originalnachricht und dem physischen Kafka-Namen des Ziels:

Bei Redis beträgt die maximale Nutzlastgröße, die als einzelner Eintrag gespeichert werden kann, 512 MB. Nutzlasten, die größer als 512 MB sind, müssen als mehrere Einträge gespeichert werden.

Der Schlüssel ist nur der (oben berechnete) keyStr mit der Segmentnummer. Derzeit muss die Ablaufrichtlinie für Redis und Kafka separat gepflegt werden (siehe "this.dataExpiry" weiter unten).

Da wir die Nutzdaten in Redis als mehr als einen Eintrag hätten speichern können, müssen wir die Anzahl der Segmente in der Antwortnutzdaten angeben:

Das folgende Diagramm zeigt, wie 3 Nachrichten mit jeweils 2, 3 oder 4 Segmenten gespeichert werden können:

Beim Empfang der realen Nutzlast werden nur N Segmente mit dem aus dem empfangenen Argument extrahierten Schlüssel gelesen:

Die empfangenen Segmente werden dann zusammengeführt und an den übergeordneten Client zurückgegeben.

Ein Rollback-Schritt ist einem Leseschritt sehr ähnlich. Anstatt Nutzdaten zu lesen, werden sie gelöscht.

Vergleich

Diese Tabelle fasst die Unterschiede zusammen: