Skip to content
Snippets Groups Projects
Commit f7c57185 authored by Dean Jukes's avatar Dean Jukes
Browse files

Kafka / python set up

parent 31e0a839
No related branches found
No related tags found
No related merge requests found
*.tgz
*.zip
*.tar.gz
*.tar
\ No newline at end of file
## SETTING UP PYTHON ENVIRONMENT
- Activate venv
[] source virtual-env/bin/activate
- Install packages
[] pip install -r requirements.txt
## SETTING UP KAFKA SERVER
- Download Kafka server
[] wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
[] tar -xzf kafka_2.13-3.9.0.tgz
[] cd kafka_2.13-3.9.0
- Setup env variables
[] KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh dragons-1)"
[] bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
- Start server
[] bin/kafka-server-start.sh config/kraft/reconfig-server.properties
\ No newline at end of file
from confluent_kafka import Consumer, KafkaException
import json
# Create Confluent Kafka consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
})
# Subscribe to 'sensor-data' topic
consumer.subscribe(['sensor-data'])
# Consume messages from 'sensor-data' topic
try:
while True:
msg = consumer.poll(1.0) # Poll for messages with a timeout of 1 second
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}")
elif msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sensor_data = json.loads(msg.value().decode('utf-8')) # Deserialize the JSON data
print(f"Received: {sensor_data}")
# You can process the data here (e.g., analyze, store in a DB)
except KeyboardInterrupt:
print("Stopped by user")
finally:
# Close down consumer to commit final offsets.
consumer.close()
\ No newline at end of file
from confluent_kafka import Producer
import random
import json
import time
# Delivery report callback
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
# Create Confluent Kafka producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
# Generate random sensor data and send to 'sensor-data' topic
try:
while True:
sensor_data = {
"sensor_id": random.randint(1, 10), # Simulating sensor ID
"temperature": random.uniform(20.0, 30.0), # Random temperature value
"humidity": random.uniform(30.0, 80.0) # Random humidity value
}
# Send data to 'sensor-data' topic
producer.produce('sensor-data', key=str(sensor_data["sensor_id"]), value=json.dumps(sensor_data), callback=delivery_report)
producer.poll(0) # Serve delivery callback queue
time.sleep(2) # Simulate data being sent every 2 seconds
except KeyboardInterrupt:
print("Stopped by user")
finally:
producer.flush()
\ No newline at end of file
confluent-kafka
prometheus-client
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment