Azure-Warteschlangen aus .NET abfragen Nachrichten-Verarbeitung mit Azure Queue Storage
Anbieter zum Thema
Die Entkoppelung von Cloud Services entscheidet über Elastizität und Skalierbarkeit. Das gilt für Microservice-Architekturen ebenso wie für klassisches Anwendungsdesign. Eine Möglichkeit der Entkoppelung bieten Warteschlangen, beispielsweise auf Basis des Azure Queue Storage.

Genau wie in Google und AWS gibt es auch in Azure eine Reihe von verwalteten Diensten, die sich mit Messaging befassen, darunter Azure Event Grid, Azure Event Hubs, Azure Service Bus, Azure Notification Hubs und Azure Queue Store. Dividiert man allerdings noch einmal gezielt Nachricht und Ereignis auseinander, reduziert sich das Angebot der reinen Messaging-Services in Azure auf Azure Service Bus, Azure Notification Hubs und Azure Queue Storage.
Microsoft Azure Service Bus ist ein vollständig verwalteter Message-Broker für Unternehmen mit Nachrichten-Warteschlangen, der mit einem Publisher-/Subscriber-Prinzip im Push-Betrieb arbeitet. Der Dienst weist große Ähnlichkeiten mit anderen Nachrichtenbrokern wie Apache ActiveMQ auf und nutzt zudem mit dem Advanced Messaging Queueing Protocol (AMQP) 1.0 einen offenen ISO/IEC-Standard.
Eines der Haupteinsatzgebiete ist die Entkoppelung von Anwendungen für eine höhere Zuverlässigkeit und bessere Skalierbarkeit von Anwendungen und Diensten, weil Producer und Consumer nicht gleichzeitig online bzw. immer verfügbar sein müssen. Zudem wird die Last verteilt, damit ein Dienst nicht aufgrund von Datenverkehrsspitzen überlastet wird.
Azure Notification Hubs hingegen stellet eine horizontal skalierbare Push-Engine zur Verfügung, mit der Nutzer von einem beliebigen lokalen oder cloudbasierten Backend aus Benachrichtigungen an eine beliebige Plattform wie z. B. iOS, Android, Windows senden können. Schwerpunkt ist hier das Senden von Nachrichten an Millionen von Empfängern mit niedriger Latenz.
Azure Queue Storage
Muss nicht nur die Zustellung, sondern die auch Verarbeitung von Nachrichten (mindestens einmal, nicht mehrfach) garantiert sein, kommen eher Pull-basierte Dienste wie Azure-Queue-Storage in Frage. Azure Queue Storage ist ein Dienst von Microsoft, der Cloud-basierte Warteschlangen auf Basis eines Azure-Speicherkontos implementiert. Dies erlaubt eine Kommunikation zwischen Komponenten einer verteilten Anwendung. Jede Warteschlange verwaltet dabei eine Liste von Nachrichten.
Dieser Artikel zeigt, wie Developer in der Azure-Cloud den Storage-Dienst „Queues“ (Warteschlangen) z. B. für eine auf .NET basierende Anwendung nutzen können. Diese werden jeweils von einer Absenderkomponente hinzugefügt und können jederzeit von einer Empfängerkomponente, welche die Warteschlange aktiv pullt, verarbeitet werden. Wurde eine Nachricht erfolgreichen verarbeitet, muss sie die Empfängerkomponente von der Warteschlange löschen, um eine doppelte Verarbeitung zu verhindern.
Arbeiten mit Azure Queue Storage unter .NET
Um diesen auf einen Microsoft-Azure-Tutorial basierenden Workshop nachvollziehen zu können, brauchen Sie einen Azure-Account. Darin erstellen Sie ein Speicherkonto in der Azure-Region ihrer Wahl vom Typ „Allgemein Version 2“. Ferner benötigen Sie an ihrem lokalen Arbeitsplatz das die .NET Core SDK-Version 3.1 und einen Editor, am besten den kostenlosen Visual Studio Code.
Sind alle Voraussetzungen erfüllt, erstellen Sie in einer Windows-Konsole Ihrer Wahl (CMD, PowerShell, PowerShell Core oder Azure CLI) mit …
dotnet new
… ein neue Konsolen-App mit einem passenden Namen, z. B. „queue-app“. Sie erhalten dann ein einfaches C#-Projekt mit einer HelloWorld-App. Dann wechseln Sie ins neue Verzeichnis „queue-app“ und leiten einen ersten Build ein:
dotnet build
Das Ergebnis sieht etwa so aus, wie in nebenstehender Abbildung. Nun müssen Sie Ihrem Projekt die Azure Storage Client-Bibliotheken hinzufügen. Das funktioniert für .NET v12SDK mit folgendem Befehl:
dotnet add package Azure.Storage.Queues
Nun öffnen Sie das Projekt durch Eingeben von …
code .
… im Projektverzeichnis z. B. in Visual Studio Code und bearbeiten die Quelldatei „Program.cs“. Hier ergänzen Sie dann den Code um drei neue Namespaces …
using System.Threading.Tasks;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
… unmittelbar nach der Anweisung using System, da unsere App Typen aus diesen Namespaces zum Herstellen einer Verbindung zu Azure Storage und zum Arbeiten mit Warteschlangen verwendet.
Nun bearbeiten Sie die Methode „Main“ der HelloWorld-Beispielanwendung dahingehend, dass sie künftig asynchron ausgeführt wird, wozu Sie void durch einen Rückgabewert von async Task ersetzen. Das muss so sein, weil die App Cloud-Ressourcen verwendet. Das Ergebnis sollte dann so aussehen, wie links im Bild.
Bevor Sie nachher noch den eigentlichen Programm-Code ändern, speichern wie zunächst die geänderte Datei Program.cs. Nun können Sie theoretisch ihre erste Warteschlange erstellen. Allerdings muss die Beispiel-App wenn sie eine Anforderung an Azure Storage senden möchte auch dazu autorisiert sein. Zur Autorisierung des Codes können Sie z. B. die Verbindungszeichenfolgen für die Speicherkontoschlüssel in den Code einbetten. Das ist zwar nicht die empfohlene Vorgehensweise, funktioniert aber für den Anfang am einfachsten.
Sicherer ist die Verwendung von verwalteten Identitäten und Azure AD. Daher fügen wir zum Autorisieren einer Anforderung die Anmeldeinformationen für unser Speicherkonto in Form der Verbindungszeichenfolge eines Speicherkontoschlüssels hinzu. Zwei per Default in jedem Speicherkonto erzeugte Access Keys finden Sie z. B. im Azure Portal im entsprechenden Speicherkonto im Abschnitt „Sicherheit + Netzwerkbetrieb“ unter „Zugriffschlüssel“.
Machen Sie die Schlüssel mit einem Klick auf „Schlüssel“ einblenden sichtbar und kopieren dann eine der beiden verfügbaren Verbindungszeichenfolgen in die Zwischenablage. Um die Verbindungszeichenfolgen zu verwenden, schreiben Sie sie zunächst einfach in eine neue Umgebungsvariable auf Ihrem lokalen Computer. Natürlich läuft die Anwendung dann nur in dieser Umgebung. Unter Windows können Sie das mit …
setx AZURE_STORAGE_CONNECTION_STRING "<myconnectstring>"
… machen, bei Linux/MacOS verwenden sie:
export AZURE_STORAGE_CONNECTION_STRING="<myconnectstring>"
Haben Sie die Umgebungsvariable hinzugefügt, starten Sie alle betroffenen Programme neu, wie z. B. Ihre CMD oder Powershell und Visual Studio Code. Dann bearbeiten Sie erneut die Datei „Program.cs“ im Editor und ersetzen den vorläufigen Programmcode, der ja nur aus der Zeile „Console.WriteLine("Hello, World");“ besteht, durch den Code:
string connectionString = Environment.GetEnvironmentVariable("AZURE_STORAGE_CONNECTION_STRING");
Nun können Sie im Quellcode das eigentliche Warteschlangenobjekt …
QueueClient queue = new QueueClient(connectionString, "mystoragequeue");
… hinzufügen und die Änderung speichern.
Jetzt benötigen Sie noch eine Methode zum Senden einer Nachricht an die Warteschlange. Diese fügen Sie der Klasse Program hinzu. Die Methode erwartet die Übergabe eines Warteschlangenverweises. Falls dieser nicht schon vorhanden ist, erstellt die Methode mit CreateIfNotExistsAsync eine neue Warteschlange und fügt dieser dann durch Aufruf von SendMessageAsync die neue Nachricht hinzu:
static async Task InsertMessageAsync(QueueClient theQueue, string newMessage)
{
if (null != await theQueue.CreateIfNotExistsAsync())
{
Console.WriteLine("The queue was created.")
}
await theQueue.SendMessageAsync(newMessage);
}
Nach dem Speichern fügen wir noch eine weitere Methode zum Entfernen einer Nachricht aus der Warteschlange hinzu: Die Methode RetrieveNextMessageAsync verwendet ReceiveMessagesAsync, um eine Nachricht aus der Warteschlange abzurufen. Übergeben Sie im ersten Parameter den Wert 1, wird nur die die jeweils nächste Nachricht in der Warteschlange übergeben. Das Löschen einer empfangenen Nachricht (nach erfolgreicher Bearbeitung) erledigt dann der Aufruf von DeleteMessageAsync.
static async Task<string> RetrieveNextMessageAsync(QueueClient theQueue)
{
if (await theQueue.ExistsAsync())
{
QueueProperties properties = await theQueue.GetPropertiesAsync();
if (properties.ApproximateMessagesCount > 0)
{
QueueMessage[] retrievedMessage = await theQueue.ReceiveMessagesAsync(1);
string theMessage = retrievedMessage[0].Body.ToString();
await theQueue.DeleteMessageAsync(retrievedMessage[0].MessageId,
retrievedMessage[0].PopReceipt);
return theMessage;
}
return null;
}
return null;
}
Auch diesen Code fügen wir noch in die Programm-Klasse ein. Gibt es Befehlszeilenargumente, die Sie an die App übergeben, z. B. eine Nachricht, die der Warteschlange hinzugefügt werden soll, müssen Sie die Argumente zu einer Zeichenfolge verknüpfen. Diese Zeichenfolge fügen Sie dann durch Verwendung der oben ergänzten Methode InsertMessageAsync der Nachrichtenwarteschlange hinzu:
static async Task Main(string[] args)
{
string connectionString = Environment.GetEnvironmentVariable("AZURE_STORAGE_CONNECTION_STRING");
QueueClient queue = new QueueClient(connectionString, "mystoragequeue");
if (args.Length > 0)
{
string value = String.Join(" ", args);
await InsertMessageAsync(queue, value);
Console.WriteLine($"Sent: {value}");
}
else
{
string value = await RetrieveNextMessageAsync(queue);
Console.WriteLine($"Received: {value}");
}
Console.Write("Press Enter...");
Console.ReadLine();
}
Speichern und übersetzen Sie den gesamten Code nun mit:
dotnet build
Das Hinzufügen einer neuen Nachricht zur Warteschlange erreichen Sie dann durch Aufruf von …
dotnet run Meine erste Nachricht
Sie können den Aufruf entweder beliebig oft wiederholen oder mit neuen Argumenten aufrufen und dann im Azure-Portal leicht prüfen, dass die Nachricht (en)vorhanden ist.
Möchten Sie die Nachrichten per CLI abrufen, geben Sie für die erste Nachricht einfach …
dotnet run
… (ohne Argumente) ein. Jeder weitere Abruf, fördert dann die jeweils nächste Nachricht zutage.
Dieser Methode wird ein Warteschlangenverweis übergeben. Wenn sie nicht bereits vorhanden ist, wird durch Aufrufen von CreateIfNotExistsAsync eine neue Warteschlange erstellt. Anschließend fügt sie der Warteschlange durch Aufrufen von SendMessageAsync die newMessage hinzu.
(ID:47613027)