141 lines (140 with data), 3.8 kB
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Collecting kafka-python\n",
" Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)\n",
" 0.0/246.5 kB ? eta -:--:--\n",
" -------------------------------------- 246.5/246.5 kB 7.6 MB/s eta 0:00:00\n",
"Installing collected packages: kafka-python\n",
"Successfully installed kafka-python-2.0.2\n"
]
}
],
"source": [
"!pip install kafka-python"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from kafka import KafkaProducer\n",
"from time import sleep\n",
"from json import dumps\n",
"import json"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# Setup Kafka producer\n",
"producer = KafkaProducer(\n",
" bootstrap_servers=['54.208.97.228:9092'], # change IP here\n",
" value_serializer=lambda x: dumps(x).encode('utf-8')\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"# Read the dataset\n",
"df = pd.read_csv(\"C:\\\\Users\\\\Yash Joshi\\\\Documents\\\\GitHub\\\\Multi-Class-Prediction-of-Obesity-Risk\\\\01-Dataset\\\\01-Data-for-model-building\\\\train.csv\")"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"# Define columns for each topic based on data source\n",
"healthcare_provider_records_cols = ['id', 'Gender', 'Age', 'Height', 'Weight', 'family_history_with_overweight', 'NObeyesdad']\n",
"lifestyle_and_diet_surveys_cols = ['id', 'FAVC', 'FCVC', 'NCP', 'CAEC', 'SMOKE', 'SCC', 'FAF', 'TUE', 'CALC', 'MTRANS']\n",
"nutritional_data_systems_cols = ['id', 'CH2O']"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"# Function to send rows to Kafka topics\n",
"def send_data(topic, columns):\n",
" for index, row in df.iterrows():\n",
" # Create a dictionary from the row only including specified columns\n",
" data = row[columns].to_dict()\n",
" # Send the data to the specified Kafka topic\n",
" producer.send(topic, value=data)\n",
" sleep(0.01) # Delay between sends to avoid overloading the server\n",
" producer.flush() # Clear data from Kafka server after sending all rows"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [],
"source": [
"# Sending data to each topic\n",
"send_data('healthcare-provider-records', healthcare_provider_records_cols)\n",
"send_data('lifestyle-and-diet-surveys', lifestyle_and_diet_surveys_cols)\n",
"send_data('nutritional-data-systems', nutritional_data_systems_cols)"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [],
"source": [
"producer.flush() "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "base",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}