diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..c6346cbc8d524caae816ae490f02ee2e27900579 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*.tgz +*.zip +*.tar.gz +*.tar \ No newline at end of file diff --git a/commands.md b/commands.md new file mode 100644 index 0000000000000000000000000000000000000000..56d87695895716bf064207352c229466735592ed --- /dev/null +++ b/commands.md @@ -0,0 +1,27 @@ + +## SETTING UP PYTHON ENVIRONMENT + +- Activate venv + +[] source virtual-env/bin/activate + +- Install packages + +[] pip install -r requirements.txt + +## SETTING UP KAFKA SERVER + +- Download Kafka server + +[] wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz +[] tar -xzf kafka_2.13-3.9.0.tgz +[] cd kafka_2.13-3.9.0 + +- Setup env variables + +[] KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh dragons-1)" +[] bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties + +- Start server + +[] bin/kafka-server-start.sh config/kraft/reconfig-server.properties \ No newline at end of file diff --git a/kafka.py b/kafka.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/kafka_consumer.py b/kafka_consumer.py new file mode 100644 index 0000000000000000000000000000000000000000..5b96aa14d1b5c7ee4a38d23d0e402e41c632f7c6 --- /dev/null +++ b/kafka_consumer.py @@ -0,0 +1,36 @@ +from confluent_kafka import Consumer, KafkaException +import json + +# Create Confluent Kafka consumer +consumer = Consumer({ + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'my-group', + 'auto.offset.reset': 'earliest' +}) + +# Subscribe to 'sensor-data' topic +consumer.subscribe(['sensor-data']) + +# Consume messages from 'sensor-data' topic +try: + while True: + msg = consumer.poll(1.0) # Poll for messages with a timeout of 1 second + + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + # End of partition event + print(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}") + elif msg.error(): + raise KafkaException(msg.error()) + else: + # Proper message + sensor_data = json.loads(msg.value().decode('utf-8')) # Deserialize the JSON data + print(f"Received: {sensor_data}") + # You can process the data here (e.g., analyze, store in a DB) +except KeyboardInterrupt: + print("Stopped by user") +finally: + # Close down consumer to commit final offsets. + consumer.close() \ No newline at end of file diff --git a/kafka_producer.py b/kafka_producer.py new file mode 100644 index 0000000000000000000000000000000000000000..22efac718eceb4d08eafb707fa12de04a3bb2a9b --- /dev/null +++ b/kafka_producer.py @@ -0,0 +1,33 @@ +from confluent_kafka import Producer +import random +import json +import time + +# Delivery report callback +def delivery_report(err, msg): + if err is not None: + print(f"Message delivery failed: {err}") + else: + print(f"Message delivered to {msg.topic()} [{msg.partition()}]") + +# Create Confluent Kafka producer +producer = Producer({'bootstrap.servers': 'localhost:9092'}) + +# Generate random sensor data and send to 'sensor-data' topic +try: + while True: + sensor_data = { + "sensor_id": random.randint(1, 10), # Simulating sensor ID + "temperature": random.uniform(20.0, 30.0), # Random temperature value + "humidity": random.uniform(30.0, 80.0) # Random humidity value + } + + # Send data to 'sensor-data' topic + producer.produce('sensor-data', key=str(sensor_data["sensor_id"]), value=json.dumps(sensor_data), callback=delivery_report) + producer.poll(0) # Serve delivery callback queue + + time.sleep(2) # Simulate data being sent every 2 seconds +except KeyboardInterrupt: + print("Stopped by user") +finally: + producer.flush() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..b5ae7909bafc2ee812b0364190f69102e726fdb0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +confluent-kafka +prometheus-client \ No newline at end of file