Wprowadzenie
Ponieważ coraz więcej usług i urządzeń wytwarza duże ilości danych, firmy na całym świecie starają się jak najszybciej przeprowadzić analizy i generować spostrzeżenia. W niektórych przypadkach, takich jak IoT, monitorowanie mediów społecznościowych lub obsługa transakcji finansowych, dane są generowane w sposób ciągły i niezbędne jest ich ciągłe przetwarzanie, co oznacza, że są uważane za dane strumieniowe. W tym artykule krótko opisujemy metody przetwarzania danych strumieniowych i podajemy przykład potoku danych, który przetwarza tweety za pomocą Twitter API i zapisuje dane w Google Cloud BigQuery za pomocą Pub/Sub i Dataflow.
Co to jest przesyłanie strumieniowe danych i jak możemy je przetwarzać?
Przesyłanie strumieniowe danych jest uważane za ciągły przepływ informacji, który jest zwykle generowany w dużych ilościach i z dużą prędkością. Tego rodzaju dane mogą być generowane z różnych źródeł, które rejestrują zdarzenia w miarę ich wystąpienia, np. rejestrowanie kliknięć na stronie internetowej, czujniki IoT mierzące temperaturę lub monitorowanie postów pojawiających się na Instagramie. W przeciwieństwie do tradycyjnego przetwarzania wsadowego, które odpowiada ładowaniu dużych ilości już przechowywanych danych jednocześnie, dane przesyłane strumieniowo muszą być przetwarzane w sposób ciągły, ponieważ pojawiają się na źródle. Oznacza to, że gdy dane pojawiają się na źródle, potok strumieniowy powinien je przetworzyć od razu, bez konieczności oddzielnego pobierania, przechowywania i przetwarzania partii.Aby przetworzyć dane przesyłane strumieniowo, możemy wybrać jedno z wielu dostępnych narzędzi, takich jak:
- Przepływ danych w chmurze Google
- Amazon Kinesis
- Usługa Azure Stream Analytics
- Belka Apache
- Apache Kafka
- Burza Apache
Odwiedź nasz blog, aby lepiej zrozumieć metodologię przesyłania danych:
- https://dsstream.com/streaming-data-architecture/
- https://dsstream.com/the-best-data-stream-solutions-and-how-to-use-them/
Przykład rurociągu strumieniowego
Wyobraź sobie, że chcesz monitorować posty pojawiające się na Twitterze, które są połączone z określonym tagiem, np. „Google”. Oczywiście możesz przejść do Twittera, po prostu wpisać słowo w pasku wyszukiwania, a otrzymasz wyniki, ale jeśli chcesz to zrobić niezawodnie i szybko wygenerować spostrzeżenia, lepiej byłoby utworzyć automatyczny potok, który wykona zadanie.Aby zademonstrować proces, utworzono potok przetwarzania danych strumieniowych.Jego celem jest pobranie tweetów, które pojawiają się na określony temat i połknięcie ich do Google Cloud.Proces można opisać w tweetach następujące kroki:
- Pobierz tweety za pomocą interfejsu API Twittera (https://developer.twitter.com/en/docs/twitter-api).
- Wyślij każdy tweet jako wiadomość do Google Pub/Sub.
- Przetwarzaj tweety za pomocą potoku Apache Beam i agreguj dane.
- Zapisz zarówno surowe tweety, jak i zagregowane dane tweetów w tabelach BigQuery.
Dostęp do interfejsu API Twittera
Twitter zapewnia niezawodny sposób programowego dostępu do treści za pośrednictwem interfejsu API. Umożliwia programistom dostęp do podstawowych elementów Twittera, takich jak: Tweety, Użytkownicy czy Wiadomości.W przedstawionym przykładzie dostęp do interfejsu API jest dostępny za pomocą pakietu tweepy, co jest bardzo wygodnym sposobem dostępu do Twittera API za pomocą Pythona (https://docs.tweepy.org/en/stable/) .Zanim będziemy mogli uzyskać dostęp do interfejsu API, musimy utworzyć konto na Twitterze i zalogować się do portalu programisty - https://developer.twitter.com/en/docs/developer-portal/overview. Po otrzymaniu Żeton na okaziciela w celu uwierzytelniania możemy rozpocząć dostęp do API. Aby sprawdzić, czy działa, możesz po prostu złożyć proste żądanie, uruchamiając polecenie cURL. Po prostu zastąp $ACCESS_TOKEN i $USERNAME na swój Żeton na okaziciela i nazwa użytkownika Twittera. curl "https://api.twitter.com/2/users/by/username/$USERNAME" -H „Autoryzacja: Otrzymujący $ACCESS_TOKEN”
Wysyłanie tweetów do Pub/Sub
Po skonfigurowaniu dostępu do Twittera możemy przystąpić do tworzenia skryptu Pythona, który ogląda tag na Twitterze, a jeśli pojawią się tweety połączone z tagiem, są wysyłane do tematu Google Cloud Pub/Sub.
UwagaGoogle Cloud Pub/Sub to usługa przesyłania wiadomości służąca do wymiany danych zdarzeń między aplikacjami i usługami. Umożliwia komunikację między usługami poprzez oddzielenie nadawców od odbiorców.
Przed utworzeniem skryptu musimy utworzyć temat Pub/Sub i konto usługi w Google Cloud.Aby utworzyć temat, wystarczy przejść do sekcji Pub/Sub w konsoli Google Cloud (https://console.cloud.google.com/cloudpubsub) i kliknij UTWÓRZ TEMAT przycisk. W naszym przykładzie temat jest nazwany test tweetów.

Następnie musimy utworzyć konto usługi. Aby to zrobić, przejdź do IAM & Admin -> Konta serwisowe -> Utwórz konto usługowe.Po ustawieniu nazwy (w przykładzie jest test na Twitterze) kliknij TWÓRZ I KONTYNUUJ.Następnie określamy role potrzebne dla SA, ustawiamy Pub/Edytor podrzędny a BigQuery Edytor danych role i kliknij Gotowe.

Następnie przechodzimy do naszego nowego konta serwisowego, wybierz klucze i dodaj nowy klucz JSON, abyśmy mogli uzyskać dostęp do naszego SA.

Następnie po prostu pobieramy klucz i ustawiamy zmienną env użytkownika na naszej lokalnej maszynie:export GOOGLE_AUTH_CREDENTIALS= <path_to_json_key_file>Teraz nasze konto usługi i temat Pub/Sub powinny być gotowe, więc możemy kontynuować tworzenie skryptu python dla strumieniowego tweeta.Najpierw zainstaluj niezbędne biblioteki:pip install --upgrade pip
pip zainstaluj google-cloud-pubsub
pip install tweepy
pip install apache-beam [gcp] Następnie utwórz skrypt python o nazwie Plik stream_to_pubsub.py:# stream_to_pubsub.py
import argparse
import json
z google.cloud import pubsub_v1
importowanie tweepy
def parse_args ():
parser = argParse.ArgumentParser ()
parser.add_argument ('--bearer_token', type=str, required=true)
parser.add_argument ('--stream_rule', type=str, required=true)
parser.add_argument ('--project_id', type=str, required=true)
parser.add_argument ('--topic_id', type=str, required=true)
zwróć parser.parse_args ()
def write_to_pubsub (dane, reguła strumienia):
dane ["reguła_strumienia"] = reguła strumienia
data_formatted = json.dumps (dane) .encode („utf-8")
id = dane ["id"] .encode („utf-8")
author_id = dane ["identyfikator_autora"] .encode („utf-8")
przyszłość = wydawca.publish (
ścieżka tematu, formatowane dane, id=id, author_id=author_id
)
drukuj (future.result ())
klasa Klienta (Tweepy.StreamingClient):
def __init__ (self, bearer_token, stream_rule):
super (). __init__ (posiadacz_token)
self.stream_rule = Reguła strumienia
def on_response (ja, odpowiedź):
tweet_data = response.data.data
user_data = response.include ['users'] [0] .data
wynik = tweet_data
wynik ["użytkownik"] = dane użytkownika
write_to_pubsub (wynik, self.stream_rule)
jeśli __nazwa__ == „__main__”:
tweet_fields = ['id', 'text', 'author_id', 'created_at', 'lang']
user_fields = ['opis', 'created_at', 'lokalizacja']
rozszerzenia = ['autor_id']
args = parse_args ()
streaming_client = Klient (args.bearer_token, args.stream_rule)
wydawca = PubSub_v1.PublisherClient ()
topic_path = wydawca.ścieżka_topic_( args.project_id, args.topic_id)
# usuń istniejące reguły
reguły = streaming_client.get_rules () .data
jeśli reguły nie są żadne:
existing_rules = [rule.id dla reguły w streaming_client.get_rules () .data]
streaming_client.delete_rules (ids=istniejące_reguły)
# dodaj nowe reguły i uruchom strumień
streaming_client.add_rules (Tweepy.streamRule (args.stream_rule))
<Bearer-token><your-project-id><your-topic-id>streaming_client.filter (tweet_fields=tweet_fields, expansions=expansions, user_fields=user_fields) Następnie skrypt można uruchomić za pomocą następującego polecenia:python3 stream_to_pubsub.py --bearer_token "" --stream_rule Google --project_id "" „Podczas jego uruchomienia możemy przejść do GCP, aby sprawdzić, czy wiadomości są przybywa. Przejdź do Pub/Sub -> subskrypcje -> nasza nowa subskrypcja (tutaj tweets-test-sub) -> wiadomości i kliknij POCIĄGNĄĆ. Powinieneś zobaczyć wiadomości odpowiadające nowo utworzonym tweetom, które są połączone ze słowem kluczowym ustawionym w reguła strumienia Zmienna w skrypcie.

Jak przetwarzać dane przesyłane strumieniowo za pomocą Apache Beam?
Do przetwarzania danych przesyłanych strumieniowo użyjemy Apache Beam, który jest ujednoliconym modelem programowania do wykonywania pociągów przetwarzania danych, zarówno wsadowych, jak i strumieniowych. Utworzony potok wykorzystuje temat Pub/Sub jako dane wejściowe i wyprowadza dane do dwóch tabel BigQuery:
- surowe tweety - tweetuj tekst z metadanymi i informacjami o autorze
- liczba poziomów minut - całkowita liczba tweetów pogrupowanych według języka minuty i tweeta
Kod rurociągu jest dostępny na Github: https://github.com/klepacz/tweet-streaming/blob/main/pipeline.py Przed uruchomieniem rurociągu musimy utworzyć nowy zestaw danych w BigQuery, aby przechowywać nasze dane wyjściowe. Aby to zrobić, przejdź do BigQuery w konsoli Google Cloud, a następnie kliknij trzy kropki obok nazwy projektu i naciśnij Utwórz zbiór danych. Nazwij to Twitter_dane i kliknij Utwórz zbiór danych.Po utworzeniu modułu rurociągu i zestawu danych BQ możesz przetestować rozwiązanie, uruchamiając Plik pipeline.py skrypt w jednym terminalu za pomocą następującego polecenia:python3 pipeline.py\
<yout-project_id>--projekt_id ""\
<yout-project_id><your-topic-id>--input_topic „projekty/ /topics/ „i Plik stream_to_pubsub.py W innym terminalu. Po kilku minutach możemy sprawdzić tabele wyjściowe, aby zobaczyć wyniki. Powinniśmy utworzyć dwie tabele BigQuery: liczba poziomów minut a surowe tweety. Możesz zachować skrypty przez kilka minut, aby zobaczyć, w jaki sposób nowe dane są dołączane do tabel. liczba poziomów minut tabela powinna wyglądać tak, możemy zobaczyć ile tweetów pojawiało się co minutę w każdym języku:

Uwaga Zauważ, że liczba poziomów minut tabela jest aktualizowana co minutę, a znacznik czasu kolumna zmienia się w odstępie jednej minuty. Jest to spowodowane konfiguracją okna w rurociągu Beam, która ma ustalony rozmiar 60 sekund. Możesz zmienić rozmiar okna wartość w kodzie pociągu, aby dostosować przedział agregacji danych.
Wdrożenie rozwiązania w Google Cloud
Teraz nasze rozwiązanie powinno działać poprawnie, ale co jeśli chcemy uruchamiać strumieniowanie tweetów przez dłuższy czas, np. tydzień? Cóż, możemy użyć Google Cloud do wdrożenia i dalej badać jej możliwości.
Wdrażanie usługi przesyłania strumieniowego Pub/Sub
Aby wdrożyć moduł do wysyłania tweetów do Pub/Sub, możemy po prostu utworzyć maszynę wirtualną w silniku obliczeniowym i uruchomić nasz Plik stream_to_pubsub.py skrypt tam. Aby to zrobić, przejdź do silnika obliczeniowego -> Instancje maszyny wirtualnej -> Utwórz instancję, a następnie wybierz nazwę i rozmiar maszyny. Tutaj jest nazwany strumieniowanie tweetów i dla typu maszyny E2-mikro został wybrany, ponieważ nie spodziewamy się dużego obciążenia pracą w naszym przypadku.

Następnie przewiń w dół do Dostęp do tożsamości i API i wybierz to samo konto usługi, które utworzyłeś wcześniej (w przykładzie jest test na Twitterze). Musimy to określić, ponieważ domyślne konto usługi nie ma dostępu do Pub/Sub i nie będziemy mogli uruchomić naszego skryptu.

Teraz możemy iść naprzód i uderzyć Stwórz a po kilku minutach możemy połączyć się z maszyną wirtualną, klikając SSH pod kolumną połączyć.Następnie musimy zainstalować nadzwyczajny uruchamiając następujące polecenia na maszynie:sudo apt-get updatesudo apt-get -