Aravind Prabhakar

Systems Engineer | Networking | Security | PreSales | Cloud | Devops | AIOps

View on GitHub Linkedin Blogs Tags
30 June 2024

Kafka producer and consumer

By Aravind

Example of a kafka producer and consumer. Producers are used when you want to add data into kafka message bus on a specific topic and consumers when you want to read data on a specific topic

Kafka Producer

import json
import random
import string
#import ipaddress
import logging
import time

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers='10.143.64.140:59092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
topicName = 'ard'
#producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))

def main():
    # Dummy Bgp Data
    bgpData = {
    	"bgpGroup": " ",
    	"bgpNeighbor": " ",
            "bgpAsn": " ",
            "bgpNlri": " ",
            "bgpComm": " "
    }

    bgp_group = ["group1", "group2", "group3", "group4", "group5"]
    bgp_comm_start = "100:"
    bgp_nlri = ["inet-vpn-unicast", "inet-unicast", "inet6-unicast", "inet6-vpn-unicast", "evpn", "inet-flow", "inet6-flow"]

    # Generate Random data
    for i in range(0,10):
        bgpData["bgpGroup"] = random.choice(bgp_group)
        bgpData["bgpNeighbor"] = "101.101."+str(i)+".1/24"
        bgpData["bgpAsn"] = random.randint(64512,65534)
        bgpData["bgpNlri"] = random.choice(bgp_nlri)
        bgpData["bgpComm"] = bgp_comm_start+str(i)
        print(bgpData)
        producer.send(topicName, bgpData)

    producer.close()

if __name__ == "__main__":
    main()

Kafka Consumer

import kafka
import json
import concurrent.futures
from kafka import KafkaConsumer
from kafka import KafkaProducer

KAFKA_BOOTSTRAP_SERVERS_CONS = '10.143.64.140:59092'
kafka_topics = ["ard"]

def thread_kafka_topic_handler(message):
    if (message.topic == 'ard'):
        print(json.loads(message.value))


def main():
    kafka_connected = False
    while True:
        if kafka_connected == False:
            try:
                consumer = KafkaConsumer(*kafka_topics,
                        max_poll_records=100000,
                        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS)
                print("kafka connected")
                kafka_connected = True
            except:
                print("Kafka not conneccted to: ", KAFKA_BOOTSTRAP_SERVERS_CONS)
                kafka_connected = False

        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            for message in consumer:
                executor.submit(thread_kafka_topic_handler, message=message,)


if __name__ == "__main__":
    main()
[ python  ] tags: python