Библиотеки Python Часть 2. Практическое применение - страница 4



– Отфильтрованные события отправляются в новый топик `filtered_clicks` через продюсера.


Задача 2: Подсчет количества событий в реальном времени

Описание:

Топик `log_events` содержит логи системы. Каждое сообщение содержит:

– `log_level` (например, "INFO", "ERROR", "DEBUG").

– `message` (текст лога).

Ваша задача: написать программу, которая считает количество событий уровня "ERROR" в реальном времени и каждые 10 секунд выводит их общее количество.

Решение:

```python

from confluent_kafka import Consumer

import time

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'log-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['log_events'])

error_count = 0

start_time = time.time()

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

log_event = json.loads(msg.value().decode('utf-8'))

# Увеличиваем счетчик, если уровень лога "ERROR"

if log_event['log_level'] == 'ERROR':

error_count += 1

# Каждые 10 секунд выводим текущий счетчик

if time.time() – start_time >= 10:

print(f"Количество ошибок за последние 10 секунд: {error_count}")

error_count = 0

start_time = time.time()

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Консьюмер читает события из топика `log_events`.

– Если уровень лога "ERROR", увеличивается счетчик `error_count`.

– Каждые 10 секунд программа выводит количество событий "ERROR" и сбрасывает счетчик.


Задача 3: Агрегация данных по группам

Описание:

Топик `transactions` содержит данные о финансовых транзакциях:

– `user_id` – идентификатор пользователя.

– `amount` – сумма транзакции.

Ваша задача: написать программу, которая подсчитывает общую сумму транзакций для каждого пользователя и выводит результаты в реальном времени.

Решение:

```python

from confluent_kafka import Consumer

import json

from collections import defaultdict

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'transaction-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['transactions'])

# Словарь для хранения сумм по пользователям

user_totals = defaultdict(float)

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

transaction = json.loads(msg.value().decode('utf-8'))

# Обновляем сумму для пользователя

user_id = transaction['user_id']

user_totals[user_id] += transaction['amount']

# Вывод текущих сумм

print(f"Текущая сумма транзакций по пользователям: {dict(user_totals)}")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Консьюмер читает данные из топика `transactions`.

– Для каждого пользователя обновляется сумма его транзакций в словаре `user_totals`.

– Программа выводит текущие суммы по всем пользователям.


Задача 4: Сохранение обработанных данных в файл

Описание:

Топик `sensor_data` содержит данные с датчиков IoT:

– `sensor_id` – идентификатор датчика.

– `temperature` – измеренная температура.

– `timestamp` – время измерения.

Ваша задача: написать программу, которая сохраняет все данные о температуре выше 30°C в файл `high_temp.json`.

Решение:

```python

from confluent_kafka import Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'