Na stránkách cloudsvět už jsme mluvili o time-series datech typických pro IoT senzory a procházeli možnosti ukládání těchto dat. Zmínil jsem, že existuje víc variant a všechny mají smysl – wide-column NoSQL jako Cassandra, NewSQL Vertica nebo i klasické relační databáze. Existuje ale i kategorie specialistů právě na tento typ dat a zkusil jsem se rámcově seznámit s tím nejznámějším – InfluxDB.
Spouštíme InfluxDB
Pokud stejně jako já nepotřebujete znát veškeré detaily InfluxDB a spíše vás zajímá získání základní představy, instalovat se vám nic chtít nebude. Z toho důvodu jsem jednoduše vzal Docker hostitele a využi hotového kontejnerového obrazu. Jen upozorňuji, že jsem se ani neobtěžoval namapovat Volume, takže perzistence dat tam realisticky vzato není. Pro praxi doporučuji použít například Docker Volume drivery pro HPE storage.
Spoustíme InfluxDB:
sudo docker network create mynet sudo docker run -d -p 8083:8083 -p 8086:8086 --name influx --net=mynet influxdb:1.0.0-rc2
Ta síť je příprava na věci budoucí, zatím ji neřešme. Přes Docker exec můžeme skočit rovnou do příkazové řádky InfluxDB:
$ sudo docker exec -ti influx influx Visit https://enterprise.influxdata.com to register for updates, InfluxDB server management, and monitoring. Connected to http://localhost:8086 version 1.0.0~rc2 InfluxDB shell version: 1.0.0~rc2 >
První kroky v InfluxDB
Založte novou databázi.
> create database mojedata > use mojedata Using database mojedata
Vložme do ní nějaká data. V příkazu insert na prvním místě figuruje measurement, tedy hlavní ukazatel, třeba teplota, vlhkost apod. Pak může následovat seznam indexovaných informací (tedy takových, podle kterých pak lze vyhledávat či agregovat), což typicky bude třeba lokalita, patro, vlastník apod. Následují samotné naměřené hodnoty. My uděláme něco velmi jednoduchého a při zadávání prosím počkejte vždy pár vteřin (ve výchozím stavu se časové razítko přiřadí automaticky, ale mohli bychom ho i explicitně sami definovat).
> insert pokus value=10 > insert pokus value=20 > insert pokus value=40
Teď si data ukážeme.
> select * from pokus name: pokus ----------- time value 1473221455280176396 10 1473221465775693211 20 1473221471573769275 40
Ve sloupečku time je časové razítko v Unix formátu (myslím že to je počet nanosekund od prvního dne roku 1970) a sloupeček value zobrazuje “naměřené” hodnoty.
Vyzkoušejme si schopnosti InfluxDB agregovat data. Jaké jsou průměrné hodnoty v pětiminutových oknech za posledních 10 minut?
> select mean("value") from pokus where time > Now() - 10m group by time(5m) name: pokus ----------- time mean 1473220800000000000 1473221100000000000 1473221400000000000 23.333333333333332
Všechny naše naměřené hodnoty spadají do stejného intervalu, takže vidíme jeden výsledek. Naše období posledních 10 minut ale zahrnuje i okna bez hodnoty (nic jsme “nenaměřili”).
Co kdybychom potřebovali údaje v oknech 15 vteřin? Senzor nám například může posílat údaje jen v okamžiku změny, pokud je vše při starém, mlčí. Jenže když například chceme kreslit graf v rozlišení 15 vteřin, tak tam rozhodně nemůžeme zanést nulu nebo nechat prázdné místo. Ukažme si, jak InfluxDB dokáže naše časové údaje zpracovat do 15-vteřinových intervalů. Pokud je v nich víc hodnot, chceme průměr (nebo to může být minimum, maximum, percentil i složitější funkce). Pokud tam naopak nebude nic, tak ať se použije poslední známá hodnota.
> select mean("value") from pokus where time > Now() - 10m group by time(15s) fill(previous) name: pokus ----------- time mean 1473221355000000000 1473221370000000000 1473221385000000000 1473221400000000000 1473221415000000000 1473221430000000000 1473221445000000000 10 1473221460000000000 30 1473221475000000000 30 1473221490000000000 30 1473221505000000000 30 1473221520000000000 30 1473221535000000000 30 1473221550000000000 30 1473221565000000000 30 ...
Více dat
Chtěl jsem pro zkoušení posbírat více dat a použil jsem veřejně dostupnou službu, která reportuje teplotu v různých lokalitách (Open Weather). Pojďme si napsat primitivní Python aplikaci, která v pravidelných intervalech stáhne teplotu z Open Weather a zapíše ji do InfluxDB:
import requests import json import os import time from influxdb import InfluxDBClient open_weather_api = os.environ['OPEN_WEATHER_API'] influx_host = os.environ['INFLUX_HOST'] influx_db = os.environ['INFLUX_DB'] url = 'http://api.openweathermap.org/data/2.5/weather' query = {'q': 'Prague', 'units': 'metric', 'APPID': open_weather_api} temp = 0.0 influx = InfluxDBClient(host=influx_host,database=influx_db) influx.create_database(influx_db) while True: try: response = requests.get(url, params=query) temp = json.loads(response.content)['main']['temp'] json_body = [{ "measurement": "temp", "tags": { "location": "Prague" }, "fields": { "value": temp } } ] influx.write_points(json_body) print "Value %s stored" % json.loads(response.content)['main']['temp'] time.sleep(10) except: pass
Skript se bere vstupní parametry z proměnných prostředí. Důvodem je, abychom mohli jednoduše kontejnerizovat s tím, že při spuštění kontejneru mu předáme parametry (především vaše Open Weather API, které získáte při registraci zdarma). Naše prostředí pak spustíme takhle:
sudo docker network create mynet sudo docker run -d -p 8083:8083 -p 8086:8086 --name influx --net=mynet influxdb:1.0.0-rc2 sudo docker run --name weather -d --net=mynet -e OPEN_WEATHER_API=3368c59a1410e0f8dec93df66b9ad945 -e INFLUX_HOST=influx -e INFLUX_DB=mydata tomaskubica/weather
Toto jsem spustil v Docker hostiteli v Azure a přišel druhý den. Podívejme se na pár příkladů v InfluxDB.
Zobrazíme kousek naměřených dat.
$ sudo docker exec -ti influx influx InfluxDB shell version: 1.0.0~rc2 > use mydata Using database mydata > select * from temp limit 10 name: temp ---------- time location value 1473150082681669758 Prague 16.6 1473150092796237754 Prague 16.6 1473150104041043443 Prague 16.29 1473150114202212828 Prague 16.29 1473150125251899460 Prague 16.6 1473150135287900322 Prague 16.6 1473150145318448463 Prague 16.6 1473150155353470826 Prague 16.29 1473150165388163474 Prague 16.6 1473150175427310780 Prague 16.29
Ukažme si naměřené hodnoty za poslední den, ale agregované (podle průměru) do časových oken jedné hodiny.
> select mean(value) from temp where time > Now() - 1d group by time(1h) name: temp ---------- time mean 1473134400000000000 1473138000000000000 1473141600000000000 1473145200000000000 1473148800000000000 16.797304347826135 1473152400000000000 17.9424581005587 1473156000000000000 18.686629213483243 1473159600000000000 19.39603351955298 1473163200000000000 20.077893258426858 1473166800000000000 20.89436974789912 1473170400000000000 21.26362359550558 1473174000000000000 21.49539106145258 1473177600000000000 21.364589235127454 1473181200000000000 20.209747899159474 1473184800000000000 19.33930167597761 1473188400000000000 18.42025210084034 1473192000000000000 17.66736694677876 1473195600000000000 17.045210084033624 1473199200000000000 16.710865921787615 1473202800000000000 16.4463509749303 1473206400000000000 16.40391061452511 1473210000000000000 16.400418994413336 1473213600000000000 16.172430167597867 1473217200000000000 15.798263305322127 1473220800000000000 15.173239436619744
Máme tedy hodně granulární data, ale jednoduše je můžeme agregovat. Co když budeme chtít po nějaké krátké období držet plná data, ale po pěti dnech už nám stačí jen informace agregované po hodině? Takovéto politiky můžete automatizovat. V naší původní sestavě nasatavíme retenční čas jen na pět dní a založíme kontinuální dotaz, který každou hodinu data zagreguje a zapíše do jiné datové struktury.
CREATE CONTINUOUS QUERY mydownsample ON mydata BEGIN SELECT mean(value) INTO hour_temp FROM temp GROUP BY time(1h) END