event = json.loads(msg.value().decode('utf-8'))
# Фильтруем события с URL, содержащими "product"
if 'product' in event['url']:
print(f"Фильтруем событие: {event}")
produce_filtered_event(event)
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Консьюмер читает события из топика `clickstream`.
– Каждое сообщение проверяется на наличие слова "product" в поле `url`.
– Отфильтрованные события отправляются в новый топик `filtered_clicks` через продюсера.
Задача 2: Подсчет количества событий в реальном времени
Описание:
Топик `log_events` содержит логи системы. Каждое сообщение содержит:
– `log_level` (например, "INFO", "ERROR", "DEBUG").
– `message` (текст лога).
Ваша задача: написать программу, которая считает количество событий уровня "ERROR" в реальном времени и каждые 10 секунд выводит их общее количество.
Решение:
```python
from confluent_kafka import Consumer
import time
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'log-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['log_events'])
error_count = 0
start_time = time.time()
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
log_event = json.loads(msg.value().decode('utf-8'))
# Увеличиваем счетчик, если уровень лога "ERROR"
if log_event['log_level'] == 'ERROR':
error_count += 1
# Каждые 10 секунд выводим текущий счетчик
if time.time() – start_time >= 10:
print(f"Количество ошибок за последние 10 секунд: {error_count}")
error_count = 0
start_time = time.time()
except KeyboardInterrupt:
print("Завершение работы.")
finally:
consumer.close()
```
Объяснение:
– Консьюмер читает события из топика `log_events`.
– Если уровень лога "ERROR", увеличивается счетчик `error_count`.
– Каждые 10 секунд программа выводит количество событий "ERROR" и сбрасывает счетчик.
Задача 3: Агрегация данных по группам
Описание:
Топик `transactions` содержит данные о финансовых транзакциях:
– `user_id` – идентификатор пользователя.
– `amount` – сумма транзакции.
Ваша задача: написать программу, которая подсчитывает общую сумму транзакций для каждого пользователя и выводит результаты в реальном времени.
Решение:
```python
from confluent_kafka import Consumer
import json
from collections import defaultdict
# Настройки Kafka
broker = 'localhost:9092'
# Создание консьюмера
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'transaction-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['transactions'])
# Словарь для хранения сумм по пользователям
user_totals = defaultdict(float)
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# Преобразуем сообщение в Python-объект
transaction = json.loads(msg.value().decode('utf-8'))
# Обновляем сумму для пользователя
user_id = transaction['user_id']
user_totals[user_id] += transaction['amount']
# Вывод текущих сумм
print(f"Текущая сумма транзакций по пользователям: {dict(user_totals)}")
except