Streamen von Twitter-Daten mit Google Cloud Pub/Sub und Apache Beam

Pawel Jedrzejewicz
Pawel Jedrzejewicz
May 6, 2025
14 min read
Loading the Elevenlabs Text to Speech AudioNative Player...

Einführung

Da immer mehr Dienste und Geräte große Datenmengen produzieren, versuchen Unternehmen auf der ganzen Welt, Analysen durchzuführen und Erkenntnisse so schnell wie möglich zu gewinnen. In einigen Fällen, wie z. B. beim Internet der Dinge, bei der Überwachung sozialer Medien oder bei der Abwicklung von Finanztransaktionen, werden die Daten kontinuierlich generiert und müssen unbedingt ständig verarbeitet werden, was bedeutet, dass sie als Streaming-Daten betrachtet werden. In diesem Artikel beschreiben wir kurz die Methoden der Streaming-Datenverarbeitung und geben ein Beispiel für eine Datenpipeline, die Tweets mithilfe der Twitter-API verarbeitet und die Daten mithilfe von Pub/Sub und Dataflow in Google Cloud BigQuery speichert.

Was sind Streaming-Daten und wie können wir sie verarbeiten?

Streaming-Daten werden als kontinuierlicher Informationsfluss betrachtet, der normalerweise in großen Mengen und mit hoher Geschwindigkeit generiert wird. Diese Art von Daten kann aus verschiedenen Quellen generiert werden, die Ereignisse protokollieren, sobald sie auftreten, z. B. indem sie Klicks auf der Website aufzeichnen, IoT-Sensoren die Temperatur messen oder Beiträge auf Instagram überwachen. Im Gegensatz zur herkömmlichen Stapelverarbeitung, bei der große Mengen bereits gespeicherter Daten auf einmal geladen werden, müssen Streaming-Daten kontinuierlich verarbeitet werden, so wie sie in der Quelle erscheinen. Das heißt, wenn Daten in der Quelle erscheinen, sollte die Streaming-Pipeline sie sofort verarbeiten, ohne dass sie separat heruntergeladen, gespeichert und stapelweise verarbeitet werden müssen. Um die Streaming-Daten zu verarbeiten, können wir eines der vielen verfügbaren Tools auswählen, z. B.:

  • Google Cloud-Datenfluss
  • Amazon Kinesis
  • Azure Stream-Analytik
  • Apache Beam
  • Apache Kafka
  • Apache Storm

Besuchen Sie unseren Blog, um die Methodik des Datenstreamings besser zu verstehen:

Beispiel einer Streaming-Pipeline

Stellen Sie sich vor, Sie möchten Beiträge überwachen, die auf Twitter erscheinen und mit einem bestimmten Tag verknüpft sind, z. B. „Google“. Natürlich können Sie zu Twitter gehen, geben Sie einfach ein Wort in die Suchleiste ein und Sie erhalten die Ergebnisse, aber wenn Sie dies zuverlässig tun und schnell einige Erkenntnisse generieren möchten, wäre es besser, die automatisierte Pipeline zu erstellen, die den Job erledigt. Um den Prozess zu demonstrieren, wurde die Streaming-Datenverarbeitungspipeline erstellt.Ihr Zweck besteht darin, Tweets herunterzuladen, die zu einem bestimmten Thema erscheinen, und sie in Google Cloud zu ingestieren. Der Prozess kann in der folgende Schritte:

  1. Laden Sie Tweets mit der Twitter-API herunter (https://developer.twitter.com/en/docs/twitter-api).
  2. Senden Sie jeden Tweet als Nachricht an Google Pub/Sub.
  3. Verarbeiten Sie die Tweets mithilfe der Apache Beam-Pipeline und aggregieren Sie die Daten.
  4. Speichern Sie sowohl unformatierte Tweets als auch aggregierte Tweet-Daten in BigQuery-Tabellen.

Zugreifen auf die Twitter API

Twitter bietet eine zuverlässige Möglichkeit, programmgesteuert über die API auf seine Inhalte zuzugreifen. Es ermöglicht Entwicklern den Zugriff auf Kernelemente von Twitter wie: Tweets, Benutzer oder Nachrichten. Im vorgestellten Beispiel wird auf die API mithilfe des Tweepy-Pakets zugegriffen, was eine sehr bequeme Möglichkeit ist, mit Python auf die Twitter-API zuzugreifen (https://docs.tweepy.org/en/stable/) .Bevor wir auf die API zugreifen können, müssen wir das Twitter-Konto erstellen und uns im Entwicklerportal anmelden - https://developer.twitter.com/en/docs/developer-portal/overview. Sobald wir die erhalten Inhaber-Token Für die Authentifizierung können wir mit dem Zugriff auf die API beginnen. Um zu testen, ob sie funktioniert, können Sie einfach eine einfache Anfrage stellen, indem Sie einen cURL-Befehl ausführen. Ersetzen Sie einfach $ACCESS_TOKEN und $USERNAME durch Ihren Inhaber-Token und Twitter username.curl "https://api.twitter.com/2/users/by/username/$USERNAME" -H „Autorisierung: Bearer $ACCESS_TOKEN“

Tweets an Pub/Sub senden

Wenn der Twitter-Zugriff eingerichtet ist, können wir mit der Erstellung des Python-Skripts fortfahren, das ein Tag auf Twitter überwacht. Wenn mit dem Tag verbundene Tweets erscheinen, werden sie an das Google Cloud Pub/Sub-Thema gesendet.

HinweisGoogle Cloud Pub/Sub ist ein Messaging-Dienst für den Austausch von Eventdaten zwischen Anwendungen und Diensten. Er ermöglicht die Kommunikation zwischen den Diensten, indem Sender und Empfänger entkoppelt werden.

Bevor wir das Skript erstellen, müssen wir ein Pub/Sub-Thema und ein Dienstkonto in Google Cloud erstellen. Um ein Thema zu erstellen, gehen Sie einfach zum Pub/Sub-Abschnitt in der Google Cloud-Konsole (https://console.cloud.google.com/cloudpubsub) und klicken Sie auf THEMA ERSTELLEN knopf. In unserem Beispiel heißt das Thema Tweet-Test.

Als Nächstes müssen wir das Dienstkonto erstellen. Gehen Sie dazu zu IAM & Admin -> Service Accounts -> Service Account erstellen. Sobald Sie den Namen festgelegt haben (im Beispiel ist es Twitter-Test) klicken Sie auf ERSTELLEN UND FORTFAHREN.Als Nächstes spezifizieren wir die Rollen, die für die SA benötigt werden, set Herausgeber Pub/Sub und BigQuery-Dateneditor Rollen und klicken Sie auf Fertig.

Als nächstes gehen wir zu unserem neuen Servicekonto und wählen Schlüssel und füge einen neuen JSON-Schlüssel hinzu, damit wir auf unsere SA zugreifen können.

Dann laden wir einfach den Schlüssel herunter und setzen die Benutzer-Umgebungsvariable auf unserem lokalen Rechner:export GOOGLE_AUTH_CREDENTIALS= <path_to_json_key_file>Jetzt sollten unser Servicekonto und das Pub/Sub-Thema startklar sein, damit wir mit der Erstellung des Python-Skripts für das Tweet-Streaming fortfahren können. Installieren Sie zuerst die erforderlichen Bibliotheken:pip install --upgrade pip
pip installiere google-cloud-pubsub
pip install tweepy
pip install apache-beam [gcp] Als nächstes erstellen Sie das Python-Skript mit dem Namen stream_to_pubsub.py:# stream_to_pubsub.py
argparse importieren
json importieren


aus google.cloud importiere pubsub_v1
Tweepy importieren


def parse_args ():
Parser = argParse.ArgumentParser ()

parser.add_argument ('--bearer_token', typ=str, required=Wahr)
parser.add_argument ('--stream_rule', typ=str, required=Wahr)
parser.add_argument ('--project_id', typ=str, required=Wahr)
parser.add_argument ('--topic_id', typ=str, required=Wahr)

gib parser.parse_args () zurück


def write_to_pubsub (Daten, Stream_Regel):
data ["stream_rule"] = Streamregel
data_formatted = json.dumps (data) .encode („utf-8")
id = Daten ["ID"] .encode („utf-8")
author_id = data ["author_id"] .encode („utf-8")

Zukunft = publisher.publish (
Themenpfad, Datenformat, ID=ID, Autor_ID=Autor_ID
)
drucken (future.result ())


Klasse Client (Tweepy.StreamingClient):
def __init__ (self, bearer_token, stream_rule):
super (). __init__ (Träger-Token)

self.stream_rule = Streamregel

def on_response (selbst, Antwort):
tweet_data = response.data.data
user_data = response.includes ['Benutzer'] [0] .data
Ergebnis = Tweet_Data
Ergebnis ["user"] = Benutzerdaten

write_to_pubsub (Ergebnis, self.stream_rule)


wenn __name__ == „__main__“:
tweet_fields = ['id', 'text', 'author_id', 'created_at', 'lang']
user_fields = ['Beschreibung', 'created_at', 'Standort']
Erweiterungen = ['author_id']

Argumente = parse_args ()
streaming_client = Klient (args.bearer_token, args.stream_rule)
Herausgeber = PubSub_v1.publisherClient ()
topic_path = publisher.topic_path (args.project_id, args.topic_id)

# bestehende Regeln entfernen
Regeln = streaming_client.get_rules () .data
wenn die Regeln nicht None sind:
existing_rules = [rule.id für die Regel in streaming_client.get_rules () .data]
streaming_client.delete_rules (ids=existierende_regeln)


# füge neue Regeln hinzu und starte Stream
streaming_client.add_rules (tweepy.streamRule (args.stream_rule))
<Bearer-token><your-project-id><your-topic-id>streaming_client.filter (tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields) Dann kann das Skript mit dem folgenden Befehl ausgeführt werden: python3 stream_to_pubsub.py --bearer_token "" --stream_rule Google --project_id "" --topic_id "„Während es läuft, können wir zu GCP gehen, um zu überprüfen, ob die Nachrichten ankommen. Gehe zu Pub/Sub -> Abos -> unser neues Abo (hier tweets-test-sub) -> Nachrichten und klicke ZIEHEN. Sie sollten die Nachrichten sehen, die den neu erstellten Tweets entsprechen, die mit dem im Feld festgelegten Schlüsselwort verknüpft sind Stream_Regel Variable im Skript.

Wie verarbeitet man Streaming-Daten mit Apache Beam?

Um die Streaming-Daten zu verarbeiten, verwenden wir Apache Beam, das einheitliche Programmiermodell zur Ausführung von Datenverarbeitungspipelines, sowohl Batch- als auch Streaming-Pipelines. Die erstellte Pipeline verwendet das Pub/Sub-Thema als Eingabe und gibt die Daten in zwei BigQuery-Tabellen aus:

  • raw_tweets - Tweet-Text mit den Metadaten und Autoreninformationen
  • Zählt die Minutenebene - Gesamtzahl der Tweets, gruppiert nach Minute und Tweet-Sprache

Der Pipeline-Code kann auf Github abgerufen werden: https://github.com/klepacz/tweet-streaming/blob/main/pipeline.py Bevor wir eine Pipeline ausführen, müssen wir in BigQuery einen neuen Datensatz erstellen, um unsere Ausgabe zu speichern. Gehen Sie dazu in Ihrer Google Cloud-Konsole zu BigQuery, klicken Sie dann auf drei Punkte neben dem Projektnamen und klicken Sie auf Datensatz erstellen. Nennen Sie es Twitter_Daten und klicken Sie Datensatz erstellen.Sobald Sie das Pipeline-Modul und den BQ-Datensatz erstellt haben, können Sie die Lösung testen, indem Sie den pipeline.py Skript in einem Terminal mit dem folgenden Befehl: python3 pipeline.py\
<yout-project_id>--projekt-id ""\
<yout-project_id><your-topic-id>--input_topic „projects/ /topics/ „und stream_to_pubsub.py in einem anderen Terminal. Nach ein paar Minuten können wir die Ausgabetabellen überprüfen, um die Ergebnisse zu sehen. Wir sollten zwei BigQuery-Tabellen erstellen lassen: Zählt die Minutenebene und raw_tweets. Sie können die Skripts noch einige Minuten laufen lassen, um zu sehen, wie die neuen Daten an die Tabellen angehängt werden.Die Zählt die Minutenebene Die Tabelle sollte so aussehen, wir können sehen, wie viele Tweets pro Minute in jeder Sprache erschienen sind:

Hinweis Beachten Sie, dass die Zählt die Minutenebene Die Tabelle wird jede Minute aktualisiert und die Zeitstempel Die Spalte ändert sich mit dem Intervall von einer Minute. Dies wird durch die Windowing-Konfiguration in der Beam-Pipeline verursacht, die auf eine feste Größe von eingestellt ist 60 Sekunden. Sie können das ändern Fenstergröße Wert im Pipeline-Code, um das Intervall der Datenaggregation anzupassen.

Bereitstellung der Lösung in Google Cloud

Jetzt sollte unsere Lösung korrekt funktionieren, aber was ist, wenn wir das Tweet-Streaming für einen längeren Zeitraum, z. B. eine Woche, ausführen wollen? Nun, wir können Google Cloud für die Bereitstellung verwenden und seine Funktionen weiter untersuchen.

Bereitstellen des Pub/Sub-Streaming-Dienstes

Um das Modul zum Senden von Tweets an Pub/Sub bereitzustellen, können wir einfach die VM auf der Compute Engine erstellen und unsere ausführen stream_to_pubsub.py dort ein Skript. Gehen Sie dazu zu Compute Engine -> VM-Instanzen -> Instanz erstellen und wählen Sie dann den Maschinennamen und die Größe. Hier ist es benannt Streamen von Tweets und für den Maschinentyp e2-Mikro wurde ausgewählt, da wir in unserem Fall keine hohe Arbeitsbelastung erwarten.

Scrollen Sie als Nächstes nach unten zu Identität und API-Zugriff und wählen Sie dasselbe Dienstkonto aus, das Sie zuvor erstellt haben (im Beispiel ist es Twitter-Test). Wir müssen das angeben, da das Standarddienstkonto keinen Zugriff auf Pub/Sub hat und wir unser Skript nicht ausführen können.

Jetzt können wir weitermachen und zuschlagen Erstellen und nach ein paar Minuten können wir uns mit der VM verbinden, indem wir auf SSH unter der Verbindungsspalte. Dann müssen wir installieren Tweepy indem Sie die folgenden Befehle auf dem Computer ausführen:sudo apt-get updatesudo apt-get -

Share this post
Data Engineering
Pawel Jedrzejewicz
MORE POSTS BY THIS AUTHOR
Pawel Jedrzejewicz

Curious how we can support your business?

TALK TO US