Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • c22115526/CIUK-24-Logicalis
1 result
Show changes
Commits on Source (11)
Showing
with 344 additions and 0 deletions
File added
kafka_2.13-3.9.0/
\ No newline at end of file
{\rtf1\ansi\ansicpg1252\cocoartf2757
\cocoatextscaling0\cocoaplatform0{\fonttbl\f0\froman\fcharset0 Times-Roman;}
{\colortbl;\red255\green255\blue255;\red0\green0\blue0;}
{\*\expandedcolortbl;;\cssrgb\c0\c0\c0;}
\paperw11900\paperh16840\margl1440\margr1440\vieww11520\viewh8400\viewkind0
\deftab720
\pard\pardeftab720\partightenfactor0
\f0\fs24 \cf0 \expnd0\expndtw0\kerning0
\outl0\strokewidth0 \strokec2 run services\
docker-compose up -d\
\
kafka ui\
http://localhost:8080\
\
prometheus\
http://localhost:9090\
\
\
grafana\
http://localhost:3000}
\ No newline at end of file
groups:
- name: KafkaAlerts
rules:
- alert: HighPartitionLag
expr: kafka_topic_partition_lag > 100
for: 1m
labels:
severity: critical
annotations:
summary: "High partition lag on {{ $labels.topic }}"
description: "Partition lag is {{ $value }} for topic {{ $labels.topic }}."
\ No newline at end of file
File added
## SETTING UP PYTHON ENVIRONMENT (2 different containers)
- Activate venv
[] source virtual-env/bin/activate
- Install packages
[] pip install -r requirements.txt
- Run Kafka Producer / Consumer
[] python kafka_consumer.py
[] python kafka_producer.py
## SETTING UP KAFKA SERVER
- Download Kafka server
[] wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
[] tar -xzf kafka_2.13-3.9.0.tgz
- Setup env variables
[] cd kafka_2.13-3.9.0
[] KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
[] bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
- Start server
[] bin/kafka-server-start.sh config/kraft/reconfig-server.properties
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
- kafka
from confluent_kafka import Consumer, KafkaException
import json
# Create Confluent Kafka consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
})
# Subscribe to 'sensor-data' topic
consumer.subscribe(['sensor-data'])
# Consume messages from 'sensor-data' topic
try:
while True:
msg = consumer.poll(1.0) # Poll for messages with a timeout of 1 second
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
# End of partition event
print(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}")
elif msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sensor_data = json.loads(msg.value().decode('utf-8')) # Deserialize the JSON data
print(f"Received: {sensor_data}")
# You can process the data here (e.g., analyze, store in a DB)
except KeyboardInterrupt:
print("Stopped by user")
finally:
# Close down consumer to commit final offsets.
consumer.close()
\ No newline at end of file
from confluent_kafka import Producer
import random
import json
import time
# Delivery report callback
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
# Create Confluent Kafka producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
# Generate random sensor data and send to 'sensor-data' topic
try:
while True:
sensor_data = {
"sensor_id": random.randint(1, 10), # Simulating sensor ID
"temperature": random.uniform(20.0, 30.0), # Random temperature value
"humidity": random.uniform(30.0, 80.0) # Random humidity value
}
# Send data to 'sensor-data' topic
producer.produce('sensor-data', key=str(sensor_data["sensor_id"]), value=json.dumps(sensor_data), callback=delivery_report)
producer.poll(0) # Serve delivery callback queue
time.sleep(2) # Simulate data being sent every 2 seconds
except KeyboardInterrupt:
print("Stopped by user")
finally:
producer.flush()
\ No newline at end of file
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka:7071']
- job_name: 'zookeeper'
static_configs:
- targets: ['zookeeper:7071']
rule_files:
- "alerts.yml"
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY kafka_consumer.py .
CMD ["python", "kafka_consumer.py"]
\ No newline at end of file
FROM openjdk:11-jre-slim
ENV KAFKA_VERSION=3.9.0
ENV SCALA_VERSION=2.13
WORKDIR /opt
COPY kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz .
# Download and extract Kafka
RUN tar -xzf kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz && \
mv kafka_${SCALA_VERSION}-${KAFKA_VERSION} kafka && \
rm kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
WORKDIR /opt/kafka
# Set up Kafka environment variables and format storage
RUN KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" && \
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
CMD ["bin/kafka-server-start.sh", "config/kraft/server.properties"]
\ No newline at end of file
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY kafka_producer.py .
CMD ["python", "kafka_producer.py"]
\ No newline at end of file
services:
kafka:
build:
context: .
dockerfile: Dockerfile.kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LOG_DIRS: /tmp/kafka-logs
volumes:
- kafka-data:/tmp/kafka-logs
producer:
build:
context: .
dockerfile: Dockerfile.producer
depends_on:
- kafka
environment:
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
volumes:
- .:/app
consumer:
build:
context: .
dockerfile: Dockerfile.consumer
depends_on:
- kafka
environment:
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
volumes:
- .:/app
volumes:
kafka-data:
\ No newline at end of file
File added
from confluent_kafka import Consumer, KafkaException
import json
# Create Confluent Kafka consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
})
# Subscribe to 'sensor-data' topic
consumer.subscribe(['sensor-data'])
# Consume messages from 'sensor-data' topic
try:
while True:
msg = consumer.poll(1.0) # Poll for messages with a timeout of 1 second
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
# End of partition event
print(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}")
elif msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sensor_data = json.loads(msg.value().decode('utf-8')) # Deserialize the JSON data
print(f"Received: {sensor_data}")
# You can process the data here (e.g., analyze, store in a DB)
except KeyboardInterrupt:
print("Stopped by user")
finally:
# Close down consumer to commit final offsets.
consumer.close()
\ No newline at end of file
from confluent_kafka import Producer
import random
import json
import time
# Delivery report callback
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
# Create Confluent Kafka producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
# Generate random sensor data and send to 'sensor-data' topic
try:
while True:
sensor_data = {
"sensor_id": random.randint(1, 10), # Simulating sensor ID
"temperature": random.uniform(20.0, 30.0), # Random temperature value
"humidity": random.uniform(30.0, 80.0) # Random humidity value
}
# Send data to 'sensor-data' topic
producer.produce('sensor-data', key=str(sensor_data["sensor_id"]), value=json.dumps(sensor_data), callback=delivery_report)
producer.poll(0) # Serve delivery callback queue
time.sleep(2) # Simulate data being sent every 2 seconds
except KeyboardInterrupt:
print("Stopped by user")
finally:
producer.flush()
\ No newline at end of file
confluent-kafka
prometheus-client
\ No newline at end of file