From f7c57185063827dc0e7ab8e1e033fb5bd6da0682 Mon Sep 17 00:00:00 2001 From: Dean Jukes <jukesd1@cardiff.ac.uk> Date: Thu, 5 Dec 2024 15:20:31 +0000 Subject: [PATCH] Kafka / python set up --- .gitignore | 4 ++++ commands.md | 27 +++++++++++++++++++++++++++ kafka.py | 0 kafka_consumer.py | 36 ++++++++++++++++++++++++++++++++++++ kafka_producer.py | 33 +++++++++++++++++++++++++++++++++ requirements.txt | 2 ++ 6 files changed, 102 insertions(+) create mode 100644 .gitignore create mode 100644 commands.md delete mode 100644 kafka.py create mode 100644 kafka_consumer.py create mode 100644 kafka_producer.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c6346cb --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*.tgz +*.zip +*.tar.gz +*.tar \ No newline at end of file diff --git a/commands.md b/commands.md new file mode 100644 index 0000000..56d8769 --- /dev/null +++ b/commands.md @@ -0,0 +1,27 @@ + +## SETTING UP PYTHON ENVIRONMENT + +- Activate venv + +[] source virtual-env/bin/activate + +- Install packages + +[] pip install -r requirements.txt + +## SETTING UP KAFKA SERVER + +- Download Kafka server + +[] wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz +[] tar -xzf kafka_2.13-3.9.0.tgz +[] cd kafka_2.13-3.9.0 + +- Setup env variables + +[] KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh dragons-1)" +[] bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties + +- Start server + +[] bin/kafka-server-start.sh config/kraft/reconfig-server.properties \ No newline at end of file diff --git a/kafka.py b/kafka.py deleted file mode 100644 index e69de29..0000000 diff --git a/kafka_consumer.py b/kafka_consumer.py new file mode 100644 index 0000000..5b96aa1 --- /dev/null +++ b/kafka_consumer.py @@ -0,0 +1,36 @@ +from confluent_kafka import Consumer, KafkaException +import json + +# Create Confluent Kafka consumer +consumer = Consumer({ + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'my-group', + 'auto.offset.reset': 'earliest' +}) + +# Subscribe to 'sensor-data' topic +consumer.subscribe(['sensor-data']) + +# Consume messages from 'sensor-data' topic +try: + while True: + msg = consumer.poll(1.0) # Poll for messages with a timeout of 1 second + + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + # End of partition event + print(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}") + elif msg.error(): + raise KafkaException(msg.error()) + else: + # Proper message + sensor_data = json.loads(msg.value().decode('utf-8')) # Deserialize the JSON data + print(f"Received: {sensor_data}") + # You can process the data here (e.g., analyze, store in a DB) +except KeyboardInterrupt: + print("Stopped by user") +finally: + # Close down consumer to commit final offsets. + consumer.close() \ No newline at end of file diff --git a/kafka_producer.py b/kafka_producer.py new file mode 100644 index 0000000..22efac7 --- /dev/null +++ b/kafka_producer.py @@ -0,0 +1,33 @@ +from confluent_kafka import Producer +import random +import json +import time + +# Delivery report callback +def delivery_report(err, msg): + if err is not None: + print(f"Message delivery failed: {err}") + else: + print(f"Message delivered to {msg.topic()} [{msg.partition()}]") + +# Create Confluent Kafka producer +producer = Producer({'bootstrap.servers': 'localhost:9092'}) + +# Generate random sensor data and send to 'sensor-data' topic +try: + while True: + sensor_data = { + "sensor_id": random.randint(1, 10), # Simulating sensor ID + "temperature": random.uniform(20.0, 30.0), # Random temperature value + "humidity": random.uniform(30.0, 80.0) # Random humidity value + } + + # Send data to 'sensor-data' topic + producer.produce('sensor-data', key=str(sensor_data["sensor_id"]), value=json.dumps(sensor_data), callback=delivery_report) + producer.poll(0) # Serve delivery callback queue + + time.sleep(2) # Simulate data being sent every 2 seconds +except KeyboardInterrupt: + print("Stopped by user") +finally: + producer.flush() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b5ae790 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +confluent-kafka +prometheus-client \ No newline at end of file -- GitLab