[7bf731]: / 05-Cloud-Architecture / Kafka / KafkaProducer.ipynb

Download this file

141 lines (140 with data), 3.8 kB

{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Collecting kafka-python\n",
      "  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)\n",
      "                                              0.0/246.5 kB ? eta -:--:--\n",
      "     -------------------------------------- 246.5/246.5 kB 7.6 MB/s eta 0:00:00\n",
      "Installing collected packages: kafka-python\n",
      "Successfully installed kafka-python-2.0.2\n"
     ]
    }
   ],
   "source": [
    "!pip install kafka-python"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "from kafka import KafkaProducer\n",
    "from time import sleep\n",
    "from json import dumps\n",
    "import json"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Setup Kafka producer\n",
    "producer = KafkaProducer(\n",
    "    bootstrap_servers=['54.208.97.228:9092'],  # change IP here\n",
    "    value_serializer=lambda x: dumps(x).encode('utf-8')\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Read the dataset\n",
    "df = pd.read_csv(\"C:\\\\Users\\\\Yash Joshi\\\\Documents\\\\GitHub\\\\Multi-Class-Prediction-of-Obesity-Risk\\\\01-Dataset\\\\01-Data-for-model-building\\\\train.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define columns for each topic based on data source\n",
    "healthcare_provider_records_cols = ['id', 'Gender', 'Age', 'Height', 'Weight', 'family_history_with_overweight', 'NObeyesdad']\n",
    "lifestyle_and_diet_surveys_cols = ['id', 'FAVC', 'FCVC', 'NCP', 'CAEC', 'SMOKE', 'SCC', 'FAF', 'TUE', 'CALC', 'MTRANS']\n",
    "nutritional_data_systems_cols = ['id', 'CH2O']"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Function to send rows to Kafka topics\n",
    "def send_data(topic, columns):\n",
    "    for index, row in df.iterrows():\n",
    "        # Create a dictionary from the row only including specified columns\n",
    "        data = row[columns].to_dict()\n",
    "        # Send the data to the specified Kafka topic\n",
    "        producer.send(topic, value=data)\n",
    "        sleep(0.01)  # Delay between sends to avoid overloading the server\n",
    "    producer.flush()  # Clear data from Kafka server after sending all rows"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Sending data to each topic\n",
    "send_data('healthcare-provider-records', healthcare_provider_records_cols)\n",
    "send_data('lifestyle-and-diet-surveys', lifestyle_and_diet_surveys_cols)\n",
    "send_data('nutritional-data-systems', nutritional_data_systems_cols)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [],
   "source": [
    "producer.flush() "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "base",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}