Wenn Netflix weiß, was du gerade fühlst Generative KI nutzt emotionales Feedback
Anbieter zum Thema
Generative KI kann Inhalte in Echtzeit auf der Grundlage von Gesichtsausdrücken erstellen. Wie das funktioniert und wie der Code dazu aussieht, verrät Datenmanagement-Spezialist Solita.

Wenn man sich nicht mehr aktiv für einen von 20 Filmen entscheiden muss, sondern der Fernseher einem die Präferenz gleichsam von den Augen abliest – dann steckt dahinter nicht zwingend Grusel-Science-Fiction. Vielmehr geht es hier um eine einfache Applikation, die Gesichtsausdrücke analysiert, um auf Basis emotionaler Reaktionen Inhalte in Echtzeit zu erstellen.
Der Anspruch: Eine Person liest eine Geschichte, die sich aufgrund der jeweiligen Reaktion spontan verändert. Hierfür verwendet die Anwendung – wer hätte es gedacht? – generative KI (GenAI). Um eine solche Applikation umzusetzen, nutzten wir bei Solita (seit Kurzem Microsoft Advanced Specialist für AI und Machine Learning) den Azure-KI-Stack.
Bilderfassung und Abruf von Gesichtsausdrücken
Um das Ziel zu erreichen, sind zunächst einige technische Schritte erforderlich. Ein Python-Skript (siehe Abbildung), das auf einem Edge-Gerät in der Azure-Cloud läuft, wird in einer Endlosschleife ausgeführt. Das Gerät nimmt ein Bild auf, ruft den „Azure Cognitive Services FaceAPI“-Endpunkt auf und schreibt die Antwort auf die Stimmung in einen Azure Blob Storage-Container.
ENDPOINT = "https://<resource>.cognitiveservices.azure.com/"
KEY = <key>
face_client = FaceClient(ENDPOINT, CognitiveServicesCredentials(KEY))
attributes = ['age', 'gender', 'headPose', 'smile', 'facialHair', 'glasses', 'emotion', 'hair', 'makeup, 'occlusion', 'blur', 'exposure', 'noise']
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
rgb = c2.cvtColor(frame, cv2.COLOR_BGR2BGRA)
cv2.imshow('frame', rgb)
jpg_buffer = imencode('.jpg', frame)[1]
json_response = (
face_client
.face
.detect_with_stream(
BytesIO(jpg _buffer),
return_face_attributes=attributes)
.response
.json())
json_binary = (
dumps(json_response,ensure_ascii=False) .encode('utf-8', 'ignore'))
json_path = 'face_api_json/{}.json'.format(<epoch>)
blob = container.get_blob_client(json_path)
blob.upload_blob(json_binary, overwrite=True)
Kontinuierliche Dateneingabe mit Azure Databricks und Spark Streaming
Für die weitere reibungslose Datenverarbeitung nutzen wir das Spark-Framework. Man stelle sich dafür nicht nur ein Gesicht vor, sondern 10.000 Personen an 10.000 verschiedenen Orten – macht 100 Millionen Stimmungen, die alle ein bis zwei Sekunden verarbeitet werden müssen.
Spark ermöglicht das parallele und verteilte Verarbeiten entsprechend großer Datenströme. Der folgende Code nimmt die Daten aus einem Cloud-Speicher auf und schreibt sie als Stream in eine Delta-Tabelle.
spark.readStream.option("inferSchema", True).json(<src_data_storage_path>, schema=schema).selectExpr(<attributes>, "current_timestamp() AS load_time_bronze").writeStream \
.format("delta") \
.option("checkpointLocation", <checkpoint>) \
.toTable(<delta_table_bronze>)
Zugriff auf Azure OpenAI LLM in einem Stream
Von dort übernimmt ein weiterer Stream. Er greift über die Chat Completion API (einen Teil der Azure OpenAI Suite) auf das Modell zu, verwendet das Sentiment und den Nachrichtenverlauf in der Eingabeaufforderung und schreibt die Antwort in eine nächste Deltatabelle. Ergebnis ist die auf dem Sentiment basierende generierte Story. Der Nachrichtenhistorie folgend, würde die Konversation so aussehen:
- Systemnachricht (für Kontext, Anweisungen oder andere Informationen, die für den Anwendungsfall relevant sind).
- Aufforderungsnachricht (vom User)
- Antwort (vom Assistenten)
- Aufforderungsnachricht (vom User)
- etc.
Die Nachrichtenliste wäre dann die Eingangsnachricht für das Modell. Die untenstehende Abbildung zeigt die anfängliche Nachrichtenliste. Sie wird im Verlauf der Story immer umfangreichen, da die Konversation an sie angehängt wird.
# Initial message list write to Delta table
message_list = [
{"role": "system", "content": "You are an AI storyteller that helps people get in the right mood."),
("role": "user", "content": "Please provide the beginning of the Shrek story as is from the movie transcript'),
("role": "assistant", "content": "Once upon a time there was a lovely princess. But she had an enchantment upon her of a fearful sort which could only be broken by love's first kiss. She was locked away in a castle guarded by a terrible fire-breathing dragon. Many brave knights had attempted to free her from this dreadful prison, but none prevailed. She waited in the dragon's keep in the highest room of the tallest tower."}]
Im Folgenden erstellt man eine benutzerdefinierte Funktion, die die auf der Stimmung basierende Benutzernachricht zur Nachrichtenliste hinzufügt. Sie fordert das Modell zur Vervollständigung mit Hilfe der Nachrichtenliste auf, fügt die Antwort zur Nachrichtenliste hinzu und gibt sie zurück.
def openai_api_call(user_message):
# Set API variables
openai.api_type = "azure"
openai.api_base = https://<RESOURCE>.openai.azure.com/
openai.api_version = <VERSION>
openai.api_key = <KEY>
try:
# Add user message to message list
message_list.append({"role": "user", "content": user_message})
response = openai.ChatCompletion.create(
engine = <engine>,
messages = message_list,
temperature=0.7,
max_tokens=800,
top_p=0.95,
frequency_penalty=0,
presence_penalty=0,
stop=None)
resp_message = response.choices[0].message.content.strip()
# Add assistant response to message list
message_list.append("role": "assistant", "content": resp_message})
return resp_message
except openai.error.InvalidRequestError as e:
return None
# Register function as UDF
openai_api_call_udf = udf(openai_api_call)
Die obige benutzerdefinierte Funktion verwendet eine Benutzernachricht als Eingabe. Wir erstellen nun eine benutzerdefinierte Funktion, die sie erzeugt. Die Funktion nimmt das Sentiment als Eingabe und erstellt die Benutzernachricht je nach Emotion.
def write_user_message(emotion):
if emotion == "happy":
user_message = "I am happy about the content, please continue the Shrek story in the same fashion rom where you left off."
elif emotion == "sad":
user_message = "I am not satisfied about the content, please modify the Shrek story to be more dramatic and exciting but continue from where you left off."
return user_message
# Register function as UDF
write_user_message_udf = udf(write_user_message)
An dieser Stelle nun kann der Stream starten und dem KI-Erzähler Leben einhauchen. Wir verwenden eine processBatch()-Funktion im Databricks Spark Structured Streaming, was mehrere Gründe hat: Sie eignet sich für die Stapelverarbeitung innerhalb einer Streaming-Anwendung. Führt man API-Aufrufe innerhalb einer Spark Structured Streaming-Anwendung durch, kann es Situationen geben, in denen es praktischer ist, einen Batch zu erstellen, bevor man den API-Aufruf durchführt. Gründe dafür sind API-Ratenbegrenzung und -Effizienz, Atomizität und Konsistenz.
def process_batch(df, epoch_id):
# Sort batch and store the latest record in a DataFrame
df = df.orderBy(df["load_time_bronze"].desc()).limit(1)
# Call write_user_message UDF -> get user message based on emotion and add as column
df = df.withColumn("user_message", write_user_message_udf(df.leading_emotion))
# Call openai_api_call UDF -> get response message and add as column
df = df.withColumn("response_message", openai_api_call_udf(df.user_message))
# Write the data to Delta table
df.write.format("delta").mode("append").saveAsTable(delta_table_silver)
# Read bronze delta table, add and drop columns
df = (spark.readStream \
.format("delta") \
.option("inferColumnTypes", "true") \
.table(delta_table_bronze) \
.withColumn("leading_emotion", when(col("happiness") == greatest(col("happiness"), col("sadness")), lit("happy")) \
.otherwise(lit("sad"))) \
.withColumn("load_time_silver", current_timestamp()) \
.drop("anger", "contempt", "disgust", "fear", "neutral", "surprise"))
# Define a streaming query to process the data using the process_batch function and wait for the query to terminate
df.writeStream \
.foreachBatch(process_batch) \
.option("checkpointLocation", checkpoint_delta) \
.start() \
.awaitTermination()
Nun, da klar ist, wie die Daten durch die verschiedenen Komponenten fließen, können wir die Lösung entwerfen.
Learnings
LLM-basierte Anwendungen lassen sich recht einfach mit bereits vorhandenen Komponenten von Cloud-Anbietern erstellen. Der größte Aufwand besteht darin, sie benutzerfreundlich zu gestalten und das Modell hinter dem User Interface so aufzubauen, dass es wie erwartet funktioniert. Durch Feinabstimmung des LLM lässt sich dieses für den speziellen Anwendungsfall noch relevanter und effektiver zu machen.
Diese Anwendung kann in verschiedenen Aspekten des täglichen Lebens äußerst hilfreich sein – nicht nur für Einzelpersonen, die gerne maßgeschneiderte Geschichten lesen, sondern auch in Unternehmen oder in der Öffentlichkeit, z. B. bei der Erstellung von Reden, für die Entwicklung maßgeschneiderter Marketingkampagnen oder bei der Generierung von Witzen für einen Stand-up-Comedian – natürlich in Echtzeit und stimmungsangepasst. Die Latenzzeit der Anwendung von einem aufgenommenen Bild bis zur generierten Geschichte betrug etwa sechs bis sieben Sekunden, was als nahezu Echtzeit gelten darf.
Natürlich gab es auch einige technische Hürden. So waren die Verfolgung der Sitzung und Speicherung der vorherigen Antworten des Modells bei der Verarbeitung von Daten mit einem Spark-Stream etwas mühsam. Die globale Variable message_list konnte ihren Wert zwischen den Stapeln nicht beibehalten. Offenbar behalten globale Variablen bei Verwendung der Funktion processBatch() in Spark Structured Streaming ihre Werte zwischen den Stapeln nicht bei, da jeder Stapel unabhängig verarbeitet wird.
* Jens Jakobsson ist Data Scientist bei Solita.
(ID:49728231)