From 63e37a1dadc766a44c9e61266657c1f465a6cbe1 Mon Sep 17 00:00:00 2001 From: Dean Jukes <jukesd1@cardiff.ac.uk> Date: Thu, 5 Dec 2024 16:49:29 +0000 Subject: [PATCH] freak --- .../kafka_2.13-3.9.0.tgz | Bin test_dockerfiles/kafka_consumer.py | 36 ++++++++++++++++++ test_dockerfiles/kafka_producer.py | 33 ++++++++++++++++ 3 files changed, 69 insertions(+) rename kafka_2.13-3.9.0.tgz => test_dockerfiles/kafka_2.13-3.9.0.tgz (100%) create mode 100644 test_dockerfiles/kafka_consumer.py create mode 100644 test_dockerfiles/kafka_producer.py diff --git a/kafka_2.13-3.9.0.tgz b/test_dockerfiles/kafka_2.13-3.9.0.tgz similarity index 100% rename from kafka_2.13-3.9.0.tgz rename to test_dockerfiles/kafka_2.13-3.9.0.tgz diff --git a/test_dockerfiles/kafka_consumer.py b/test_dockerfiles/kafka_consumer.py new file mode 100644 index 0000000..a546e4c --- /dev/null +++ b/test_dockerfiles/kafka_consumer.py @@ -0,0 +1,36 @@ +from confluent_kafka import Consumer, KafkaException +import json + +# Create Confluent Kafka consumer +consumer = Consumer({ + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'my-group', + 'auto.offset.reset': 'earliest' +}) + +# Subscribe to 'sensor-data' topic +consumer.subscribe(['sensor-data']) + +# Consume messages from 'sensor-data' topic +try: + while True: + msg = consumer.poll(1.0) # Poll for messages with a timeout of 1 second + + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaException._PARTITION_EOF: + # End of partition event + print(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}") + elif msg.error(): + raise KafkaException(msg.error()) + else: + # Proper message + sensor_data = json.loads(msg.value().decode('utf-8')) # Deserialize the JSON data + print(f"Received: {sensor_data}") + # You can process the data here (e.g., analyze, store in a DB) +except KeyboardInterrupt: + print("Stopped by user") +finally: + # Close down consumer to commit final offsets. + consumer.close() \ No newline at end of file diff --git a/test_dockerfiles/kafka_producer.py b/test_dockerfiles/kafka_producer.py new file mode 100644 index 0000000..22efac7 --- /dev/null +++ b/test_dockerfiles/kafka_producer.py @@ -0,0 +1,33 @@ +from confluent_kafka import Producer +import random +import json +import time + +# Delivery report callback +def delivery_report(err, msg): + if err is not None: + print(f"Message delivery failed: {err}") + else: + print(f"Message delivered to {msg.topic()} [{msg.partition()}]") + +# Create Confluent Kafka producer +producer = Producer({'bootstrap.servers': 'localhost:9092'}) + +# Generate random sensor data and send to 'sensor-data' topic +try: + while True: + sensor_data = { + "sensor_id": random.randint(1, 10), # Simulating sensor ID + "temperature": random.uniform(20.0, 30.0), # Random temperature value + "humidity": random.uniform(30.0, 80.0) # Random humidity value + } + + # Send data to 'sensor-data' topic + producer.produce('sensor-data', key=str(sensor_data["sensor_id"]), value=json.dumps(sensor_data), callback=delivery_report) + producer.poll(0) # Serve delivery callback queue + + time.sleep(2) # Simulate data being sent every 2 seconds +except KeyboardInterrupt: + print("Stopped by user") +finally: + producer.flush() \ No newline at end of file -- GitLab