diff --git a/kafka_2.13-3.9.0.tgz b/test_dockerfiles/kafka_2.13-3.9.0.tgz similarity index 100% rename from kafka_2.13-3.9.0.tgz rename to test_dockerfiles/kafka_2.13-3.9.0.tgz diff --git a/test_dockerfiles/kafka_consumer.py b/test_dockerfiles/kafka_consumer.py new file mode 100644 index 0000000000000000000000000000000000000000..a546e4c9b26d6047be54277a0c85f909e3cef2b5 --- /dev/null +++ b/test_dockerfiles/kafka_consumer.py @@ -0,0 +1,36 @@ +from confluent_kafka import Consumer, KafkaException +import json + +# Create Confluent Kafka consumer +consumer = Consumer({ + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'my-group', + 'auto.offset.reset': 'earliest' +}) + +# Subscribe to 'sensor-data' topic +consumer.subscribe(['sensor-data']) + +# Consume messages from 'sensor-data' topic +try: + while True: + msg = consumer.poll(1.0) # Poll for messages with a timeout of 1 second + + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaException._PARTITION_EOF: + # End of partition event + print(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}") + elif msg.error(): + raise KafkaException(msg.error()) + else: + # Proper message + sensor_data = json.loads(msg.value().decode('utf-8')) # Deserialize the JSON data + print(f"Received: {sensor_data}") + # You can process the data here (e.g., analyze, store in a DB) +except KeyboardInterrupt: + print("Stopped by user") +finally: + # Close down consumer to commit final offsets. + consumer.close() \ No newline at end of file diff --git a/test_dockerfiles/kafka_producer.py b/test_dockerfiles/kafka_producer.py new file mode 100644 index 0000000000000000000000000000000000000000..22efac718eceb4d08eafb707fa12de04a3bb2a9b --- /dev/null +++ b/test_dockerfiles/kafka_producer.py @@ -0,0 +1,33 @@ +from confluent_kafka import Producer +import random +import json +import time + +# Delivery report callback +def delivery_report(err, msg): + if err is not None: + print(f"Message delivery failed: {err}") + else: + print(f"Message delivered to {msg.topic()} [{msg.partition()}]") + +# Create Confluent Kafka producer +producer = Producer({'bootstrap.servers': 'localhost:9092'}) + +# Generate random sensor data and send to 'sensor-data' topic +try: + while True: + sensor_data = { + "sensor_id": random.randint(1, 10), # Simulating sensor ID + "temperature": random.uniform(20.0, 30.0), # Random temperature value + "humidity": random.uniform(30.0, 80.0) # Random humidity value + } + + # Send data to 'sensor-data' topic + producer.produce('sensor-data', key=str(sensor_data["sensor_id"]), value=json.dumps(sensor_data), callback=delivery_report) + producer.poll(0) # Serve delivery callback queue + + time.sleep(2) # Simulate data being sent every 2 seconds +except KeyboardInterrupt: + print("Stopped by user") +finally: + producer.flush() \ No newline at end of file