MongoDB Messaging via Morphium

veröffentlicht am : So, 06. 05. 2018 geändert am: Mo, 30. 07. 2018

Schlagworte: java programming morphium


Einer der vielen Vorteile von Morphium ist das integrierte Messagingsystem, welches z.B. auch für die Synchronisierung der Caches benutzt wird. Das funktioniert schon so seit mit einer der erste Versionen von Morphium.

Das Messaging setzt dabei auf ein ausgeklügeltes "Locking", damit Nachrichten, die nur für einen Empfänger bestimmt sind, auch nur dort ankommen. Leider kann man so etwas im Normalfall nur durch Polling, d.h. immer wieder die selbe Anfrage an die DB senden, realisieren. Aber seit der V3.2.0 wird im Falle der Verwendung eines Replicasets der OplogMonitor genutzt, um Messaging auf eine art "Push" aufzusetzen. D.h. die DB informiert die Clients, wenn es neue Nachrichten gibt.

Das reduziert die Last und erhöht die Reaktionszeit. Schauen wir uns das im Detail mal an...

Messaging mit Morphium - Funktionsweise

Wie einleitend schon erwähnt, muss man ab der V3.2.0 2 Fälle unterscheiden: ist Morphium mit einem ReplicaSet verbunden oder nicht.

Kein Replicaset => Polling

Das gitl im übrigen auch, für einen Sharded Cluster! D.h. wenn man mit dem MongoS verbunden ist, funktioniert das Messaging via Polling. Das Polling kann konfiguriert werden, d.h. wie oft pro Minute soll denn gefragt werden. Sollen die Nachrichten einzeln bearbeitet werden, oder alle auf ein mal etc.

Im Endeffekt läuft alles auf das Locking raus. Der Algorithmus sieht in etwa so aus (und kann in Messaging.java nachgelesen werden):

  1. ein Kommando an die Mongo senden, welches eine Nachricht die entweder direkt für dieses MessagingSystem oder für alle bestimmt ist und exklusiv (also nur ein mal bearbeitet werden darf) markiert ist, für dieses System lockt! Das geschieht mit Hilfe einer UUID die beim Start des Messaging erzeugt wird. Entweder eine oder alle passenden locken, je nach Einstellung.
  2. lesen ob es von mir (=dieses System) gelockte bzw. Nachrichten für alle gibt.
  3. jede dieser Nachrichten bearbeiten.
  4. die Nachricht als bearbeitet markieren (UUID->processed_by)
  5. kurz pausieren und zurück zu 1

Replicaset => OpLogMonitor bzw. ChangeStream

der OpLogMonitor ist ja schon eine Weile teil von Morphium. Mit dem OpLogMonitor wird ein TailableCursor, also ein stets geöffneter Curser, ans OpLog "gehängt". Damit bekommt man eine Nachricht, sobald eine Änderung im OpLog geschieht. Was im Replicaset immer dann der Fall ist, wenn es einen schreibzugriff auf die MongoDB gibt.

Ab Morphium 4.0 wird in diesem Fall auch der ChangeStream dieser Collection genutzt. Damit ist man nicht mehr auf den Zugriff des OpLog angewiesen.

Warum dann nicht einen TailableCursor direkt an die Msg-Collection hängen? Ja, das haben wir uns auch überlegt, leider funktioniert das aus folgenden Gründen nicht wirklich:

  1. tailableCursor funktionieren nur auf Capped Collections. Das ist zwar nicht tragisch in unserem Fall, aber etwas unschön
  2. man bekommt nur "neue" einträge übertragen, nicht jedoch updates. Das ist insbesondere für das Locking exklusiver Nachrichten nötig. Das würde dazu fürhren, dass man diesen Fall eh durch Polling lösen muss. Also kein Gewinn...

Die Verarbeitung der Messages ist im endeffect identisch zu oben, nur kann man sich das Locking vereinfachen. Bei einer eingehenden neuen Nachricht, passiert das:

  1. ist die eingehende Nachricht exklusiv markiert, findet das locking von oben statt aber nur für diese eine Nachricht (die ID hat man ja). Ist somit natürlich effizienter...
  2. ist die eingehende Nachricht nicht exklusiv markiert (und nicht von mir selbst), dann einfach bearbeiten
  3. ist die eingehende Nachricht exklusiv und direkt an mich adressiert, dann bearbeiten

im Falle von Updates einer Nachricht muss ja eigentlich erst mal gar nicht so viel passieren. Dennoch wird in diesem Fall die Nachricht noch mal kurz gelesen und geprüft, ob sie bearbeitet werden muss.

Messaging nutzen

Die Nutzung von Messaging in Morphium ist ziemlich einfach, man erstellt eine Instanz der Klasse Messaging und legt los:

   Messaging messaging=new Messaging(morphium, 500, true);
   messaging.start();

im Idealfall sollte man das Messaging system z.B. über Spring oder so initialisieren.

Nachricht senden

Und dann kann man eigentlich schon loslegen. Eine Nachricht wird wie folgt gesendet:

    messaging.storeMessage(new Msg("Testmessage", "A message", "the value - for now", 5000));

hier wird eine Nachricht mit einer TTL (time to live) von 5 sek gespeichert. Die Default TTL ist 30sek. Ältere Nachrichten werden automatisch gelöscht (aber nicht zwingend genau nach nach 30sek...)

Nachrichten sind per default broadcast, d.h. sie werden von allen verbundenen Clients gelesen und bearbeitet. Würde man die Nachricht auf "Exclusiv" setzen, könnten alle sie lesen, aber nur einer soll sie bearbeiten.

        Msg m = new Msg();
        m.setExclusive(true);
        m.setName("A message");

Diese Nachricht wird nur von einem einzigen empfänger bearbeitet!

Grundsätzlich werden Nachrichten übrigens vom Sender selbst nicht gelesen.

Und zu guter Letzt, man kann nachrichten natürlich auch direkt an einen Empfänger senden. Das passiert z.B. wenn Antworten gesendet werden. Die sollen ja nur vom Sender der ursprünglichen Nachricht bearbeitet werden.

Um eine Nachricht direkt an einen bestimmten Empfänger zu senden, muss man die ID des senders kennen. Das kann einfach durch eine eingehende Nachricht passieren oder man implementiert so eine Art discovery...

        Msg m = new Msg("testmsg1", "The message from M1", "Value");
        m.addRecipient(recipientId);
        m1.storeMessage(m);
storeMessage vs queueMessage

in den Integration-Tests in Morphium werden beide Methoden verwendet. Der Unterschied ist relativ einfach: storeMessage schreibt die Nachricht direkt in die Mongo wohingegen queueMessage asynchron funktioniert. D.h. die Nachricht wird erst entgegengenommen und im Hintergrund geschrieben. Evtl. der bessere Weg für Performance.

Nachrichten Emfpangen

Empfang von Nachrichten ist genauso einfach. im Messaging wird einfach ein MessageListener registriert:

           messaging.addMessageListener((messaging, message) -> {
            log.info("Got Message: " + message.toString());
            gotMessage = true;
            return null;
        });

Dabei ist message die Message und messaging das MessagingSystem. Der listener liefert hier als Ergebnis null zurück, könnte aber auch eine Nachricht als Antwort zurückgeben. Diese würde dann automatisch direkt an den Sender zurückgeschickt.

Mit Hilfe von messaging kann der listener auch auf das MessagingSystem zugreifen und bei Bedarf z.B. selbst Nachrichten versenden, die keine direkte Antwort sein sollten.

Des Weiteren kann der Listener die Bearbeitung "verweigern" indem er eine MessageRejectedException wirft. Der Sender kann auch darüber informiert werden, jedoch wird die Nachricht einfach wieder in den "Pool" zurückgeworfen und kann dann von anderen bearbeitet werden.

Das ist natürlich nur dann wirklich ein Problem, wenn es sich um eine exklusive Nachricht gehandelt hat. Denn dann sollte ein anderer Listener die Nachricht bearbeiten können.

Messaging im Einsatz - Synchronisierung der Caches

Innerhalb von Morphium setzt der CacheSynchronizer auf Messaging auf. Er benötigt im Constructor auch ein Messaging System.

Die Implementierun ist eigentlich recht einfach. Der CacheSynchronizer registriert sich als MorphiumStorageListener in Morphium und wird so über jeden Schreibzugriff informiert (denn nur dann muss ja auch der Cache synchronisiert werden).

public class CacheSynchronizer implements MessageListener, MorphiumStorageListener {

}

Kommt ein Schreibzugriff rein, wird geprüft ob es sich um ein gecachtes Entity handelt und wenn ja, wird eine ClearCache-Message über das Messaging gesendet. Diese beinhaltet, was zu tun ist abhängig von der gewählten Strategie (siehe auch Cache-Annotation in Morphium).

Außerdem bearbeitet der CacheListener natürlich auch eingehende Nachrichten. Kommt eine Nachricht an, wird einfach der in der Message beschriebene Cache gelöscht, das element geändert oder der Cache anderweitig angepasst.

Diese Clear-Nachrichten lassen sich auch über den CacheSynchronizer direkt versenden.

und es sollte auch nicht unerwähnt bleiben, dass man natürlich auch hier einen Listener registrieren kann, um sich über die Synchronisation der Caches informieren zu lassen.

Fazit

das Messaging feature von Morphium führt zu Unrecht ein Schattendasein. Es kann in vielen Fällen als einfacher Ersatz für full-blown MessagingSysteme dienen und kann super einfach eingesetzt werden. Mit der neuen Basis auf OpLogMonitor ist ein wichtiges neues Feature hinzugekommen um Messaging noch häufiger einsetzen zu können.

erstellt Stephan Bösebeck (stephan)