Перейти к основному содержимому

Синхронизация данных

Посты

Данные в постах мониторинга — это усредненные данные. Рассмотрим двадцатиминутные усреднения на устройстве с частотой измерения 1 минута. Это означает, что данные придут в конечное состояние только после получения всех двадцати пакетов от устройства. После получения сервером каждого пакета из двадцатиминутного интервала усредненные данные будут изменяться.

Что это означает на практике?

Это означает, что синхронизация данных по диапазонам дат может происходить с ошибкой. Для решения этих проблем мы предлагаем следующий способ хранения и обновления данных и получения их при помощи API.

  1. Уникальным ключом данных при таком подходе является пара (идентификатор поста, время усреднения).
  2. Для отслеживания изменения усреднений уже полученных данных мы предлагаем использовать поле version. Version — версия усредненных данных, которая:
    1. монотонно возрастает для всей базы данных;
    2. изменяется в случае обновления данных для какой-то отметки времени.
  3. Синхронизация осуществляется путем постраничного запроса, в параметрах которого фигурирует последнее значение версии, сохраненной в базе данных (параметр запроса - ?version__gt=). Для ключа (пост, время усреднения) возможно получить разные значения измерений при повторных запросах. Таким образом, при синхронизации необходимо проверять, нет ли ранее сохраненных данных для ключа (№ поста, дата усреднения).

Пример ситуации, когда значения ранее полученного усреднения могут поменяться при следующем запросе.

  1. В интервале времени от 16:00 до 16:10 Пост №1 получил 10 пакетов от станции, которая отправляет данные раз в минуту.
  2. При вызвове API в 16:10 вернутся усредненные данные, приведенные к концу интервала:
    1. Пост №1.
    2. Время - 16:20.
    3. Версия - 101.
    4. Среднее значение какого-то показателя - x.
  3. Пост продолжает получать данные с 16:10 до 16:20 — еще 10 пакетов.
  4. При вызове API в 16:20 с параметром «Версия 101» вернется:
    1. Пост №1.
    2. Время — 16:20.
    3. Версия — 102.
    4. Среднее значение того же показателя — y. В этом случае нужно по ключу (Пост №1, время 16:20) проверить наличие данных, если они есть (как в нашем случае) - обновить. Иначе просто добавить данные. Также нужно сохранить последнее значение версии (102) для последующих вызовов API.

Описанная ситуация может возникнуть также в случае сетевых проблем на устройстве. После исправления проблемы устройство отправит ранее измеренные величины при восстановлении связи, и данные усреднений изменятся.

Пример сохранения данных для django-framework


def save_packet(date, post_id, version, measurements):
packet = Packet.objects.filter.get(date=date, post_id=post_id)
if packet is None:
Packet.objects.create(date=date, post_id=post_id,version=version, measurements=measurements)
else:
packet.measurements = measurements
packet.save()

В приведенном примере для получения version для следующих запросов нужно будет выбрать максимальное значение из сохраненных.

Пример получения данных из API с использованием постраничного вывода.


import os
import json

import requests

post_list_url = 'https://mycityair.ru/harvester/v2/Posts'
api_key = os.getenv('CITYAIR_TOKEN') # or set your api key directly

assert api_key, 'API key is empty'

headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}


def make_request(url):
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()


def pretty_print(data: dict):
formatted_response = json.dumps(data, indent=2, ensure_ascii=False)
print(formatted_response)


def build_measurement_url(post_id, version, limit):
interval = '1h'
return f'https://mycityair.ru/harvester/v2/Posts/{post_id}/measurements?' \
f'interval={interval}&' \
f'limit={limit}&' \
f'version__gt={version}'


posts = make_request(post_list_url)
assert len(posts) > 0, 'At least one post is required'
post = posts[0]
print(f'Got post {post["id"]}:{post["name"]}')

last_version_id = 1 # for the first sync process
for i in range(5): # five iterations, just for an example
url = build_measurement_url(post['id'], last_version_id, limit=2)
response = make_request(url)
print(f'Iteration #{i}')
if len(response['data']) < 0:
print("No new data on server")
break
pretty_print(response['data'])
for packet in response['data']:
# save data
pass
last_version_id = max(map(lambda d: d['version'], response['data']))

Станции

Данные с устройств передаются в том виде, в каком были получены. В отличие от постов данные передаются без усреднений.

На текущий момент возможна синхронизация только по диапазону дат. При этом возможна ситуация, когда позже на сервере появятся данные из ранее запрошенного интервала дат.

import os
import json
import datetime

import requests

station_list_url = 'https://mycityair.ru/harvester/v2/Stations'
api_key = os.getenv('CITYAIR_TOKEN') # or set your api key directly

assert api_key, 'API key is empty'

headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}


def make_request(url):
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()


def pretty_print(data: dict):
formatted_response = json.dumps(data, indent=2, ensure_ascii=False)
print(formatted_response)


def build_measurement_url(station_id, date_start, date_end):
return f'https://mycityair.ru/harvester/v2/Stations/{station_id}/measurements?' \
f'date__gt={start_date.isoformat()}&' \
f'date__lt={end_date.isoformat()}'


stations = make_request(station_list_url)
assert len(stations) > 0, 'At least one station is required'
station = stations[0]
print(f'Got station {station["id"]}:{station["name"]}')

hour = datetime.timedelta(hours=1)
start_date = datetime.datetime.utcnow() - datetime.timedelta(hours=300)
end_date = start_date + hour


for i in range(5): # do five sync iterations
url = build_measurement_url(station['id'], start_date, end_date)
response = make_request(url)
print(f'Iteration #{i}')
if len(response['data']) < 0:
print("No new data on server")
break
pretty_print(response['data'])
for packet in response['data']:
# save data
pass
start_date += hour
end_date += hour