Corona-Daten des RKI mit Python herunterladen
Auch Ende 2020 hält uns Corona fest im Griff. Vor allem vor den Feiertagen habe ich mir die Frage gestellt, wie sinnvoll es ist, wenn Personen aus unterschiedlichen Landesteilen zusammenkommen. Ich wollte also auf einen Blick für meine Familie sehen können, wie sich die Lage in den jeweiligen Landkreisen entwickelt. Idealerweise automatisiert jeden Morgen auf meinem Handy - ohne die völlig benutzerunfreundlichen Veröffentlichungen des RKI durchstöbern zu müssen.
Die Idee war geboren. Ein Python-Skript sollte mir jede Nacht die Daten vom RKI abziehen, in einer Grafik aufbereiten und via Telegram an meine Familie verteilen. Wenn die Muse anhält, werde ich in den nächsten Tagen also kurz beschreiben, wie ich Daten vom RKI mit Python abziehe, diese mit Plotly grafisch aufbereite und schließlich in einen Telegram-Kanal jeden Morgen poste.
Beginnen wir mit dem Abziehen der Daten aus dem NPGEO Corona Hub. Das RKI veröffentlicht die Daten - aus meiner Sicht sehr dürftig dokumentiert - über arcgis.com, das mittels API von Python aus angesprochen werden kann. Wir benötigen zunächst also die URL.
Da ich mich für Daten auf Landkreisebene interessiere, öffnen wir https://npgeo-corona-npgeo-de.hub.arcgis.com/search?collection=Dataset und wählen den Landkreis-Datensatz aus. Über den API-Explorer-Tab können wir Filter setzen, die relevanten Felder auswählen und die sich ergebende URL kopieren.
Dann bereiten wir unser Python-Skript vor, indem wir die notwendigen Bibliotheken laden.
- mit
jsonundrequestsladen wir die Daten pandasvereinfacht die Aufräum-Arbeit in den heruntergeladenen Datenosunddatetimehelfen uns die Daten in Unterordnern und mit Datumsstempeln zu speichern
1# imports
2import json
3import os
4import pandas as pd
5import requests
6
7from datetime import datetime
8from pandas.io.json import json_normalizeIm nächsten Schritt definieren wir ein paar Variablen.
api_urlist die API-Query-URL, die wir von arcgis.com kopiert haben.app_dirunddata_direnthalten den Dateipfad zum Verzeichnis, in dem die Python-Datei liegt, sowie einem Unterordnerdata. In letzterem werden wir die heruntergeladenen Daten speichern.
1# define variables
2api_url = r'https://services7.arcgis.com/mOBPykOjAyBO2ZKk/arcgis/rest/services/RKI_Landkreisdaten/FeatureServer/0/query?where=1%3D1&outFields=*&outSR=4326&f=json'
3app_dir = os.path.abspath(os.path.dirname(__file__))
4data_dir = app_dir + '/data/'Der eigentliche Download verfolgt über eine Kombination von json.loads() mit requests.get(). Mittels json_normalize() wandeln wir das heruntergeladen JSON-File in einen Pandas-Datensatz um.
1# download and normalize
2d = json.loads(requests.get(api_url).text)
3df = json_normalize(d, record_path=['features'])Perfekt. Mittels df.head() können wir einen Blick auf die Daten werfen. Mir persönlich gefällt das attributes.-Präfix bei den Spaltennamen nicht. Lass es uns also entfernen. Dafür erzeugen wir ein leeres Array, schleifen über die Spaltennamen, ersetzen dabei jeweils “attributes.” mit "" und überschreiben schließlich die Spaltennamen mit dem Array. Es gibt dafür sicher einen besseren Weg.
1# remove "attributes." from field names
2colnames = []
3for i in range(len(df.columns)):
4 colnames.append(df.columns[i].replace('attributes.', ''))
5
6df.columns = colnamesDas RKI veröffentlicht über die API lediglich die neusten Daten. Sofern wir einen Zeitverlauf analysieren wollen, müssen wir die Daten also abspeichern. Der Einfachheit halber sichern wir den heruntergeladenen Datensatz als CSV mittels Pandas to_csv()-Funktion. Damit wir die Dateien den einzelnen Tagen zuordnen können, extrahieren wir aus der Spalte last_update das Datum und bauen das in den Namen der exportierten CSV-Datei ein.
1# extract last update date (for filename)
2file_date = datetime.strptime(df['last_update'][0], '%d.%m.%Y, %H:%M Uhr').strftime("%Y%m%d_%H%M")
3
4# epxort CSV to data directory, using last update date for filename
5df.to_csv(
6 path_or_buf = data_dir + "rki_" + file_date + ".csv",
7 sep = ";",
8 header = True,
9 index = False,
10 encoding = 'utf-8'
11)Und damit sind wir eigentlich auch schon fertig. Damit ich nicht für jede Anwendung die Daten neu einlesen und zusammenfügen muss, habe ich meinem Python-Skript noch einen letzten Absatz mitgegeben. Dieser erzeugt zunächst eine Liste aller unter data/ abgespeicherten Dateien (deren Dateiname mit rki_ beginnt), liest diese anschließend der Reihe nach ein, fügt sie zu einem Datensatz zusammen und speichert diesen als zusätzliches CSV-File ab.
1# create list of all exported CSV files
2filepaths = [f for f in os.listdir((data_dir)) if f.startswith('rki_')]
3filepaths
4
5# append exported CSV files
6rkitmp = []
7for f in filepaths:
8 dftmp = pd.read_csv(data_dir + f, sep = ";", decimal=",")
9 rkitmp.append(dftmp)
10
11# concat to Pandas data frame
12rki = pd.concat(rkitmp, axis=0, ignore_index=True, sort = True)
13rki = rki.drop_duplicates()
14
15# export combined data
16rki.to_csv(
17 path_or_buf = data_dir + "rki.csv",
18 sep = ";",
19 header = True,
20 index = False,
21 encoding = 'utf-8'
22)