Flask App
Here's the layout for present project. [FOLDER] repersents folder and rest are files can be identified by their extensions(ie .py, .html. .ini .txt. pdf)
Project layout
├── app.py
├── [covidInfo]
│ ├── covidData.py
│ └── pinCodeData.csv
├── [getCredentials]
│ ├── config.ini
│ └── read.py
├── [loggingdb]
│ └── saveLog.py
├── [Procfile]
├── requirements.txt
└── [sendDetailedEmail]
├── email.py
├── FAQ1.pdf
└── Template_corona_info.html
Click on the top right corner to see the full codes. Folowing are the glimpse of the main portion of the code -
inside app.py we have severals webhooks -
this one to get world data -
world wise
# route to get covid data wrt World. It follows same flow as PIN
@app.route("/covidWorld", methods=['POST'])
@cross_origin()
def covidWorld():
try:
dataRequest = request.get_json()
logDetails(whoAmI="user", query=dataRequest)
country = dataRequest["country"]
covid = Covid()
result = covid.get_status_by_country_name(country)
response = {"fullFilmenttext":result}
logDetails(whoAmI="bot", query=response)
return response
except Exception as e:
response = {"fullFilmenttext":f"process failed due to Error:{e}"}
logDetails(whoAmI="bot", query=response)
return response
To get pincode wise data -
pincode
# route to get covid data PIN code wise
@app.route("/covidInfo", methods=['POST'])
@cross_origin()
def covidInfo():
try:
# get user's request details
dataRequest = request.get_json()
# log the details in the db
logDetails(whoAmI="user", query=dataRequest)
# send details to the user as per PIN and State
# if keys contains PIN
if "PIN" in dataRequest.keys():
# get details wrt PIN
obj = CovidCasesIndia(PIN=dataRequest["PIN"])
result = obj.get_data()
if "state" in dataRequest.keys():
# get detila wrt state
obj = CovidCasesIndia(state=dataRequest["state"])
result = obj.get_data()
# record response in dict format and return
response = {"fullFilmenttext":result}
logDetails(whoAmI="bot", query=response)
return response
except Exception as e:
# in case of failure save error log and send the response with error msg
response = {"fullFilmenttext":f"process failed due to Error:{e}"}
logDetails(whoAmI="bot", query=response)
return response
To send the mail following webhook is used -
sendMail
# route to save sendmail
@app.route("/sendMail", methods=["POST"])
@cross_origin()
def sendMail():
try:
# get client info
clientInfo = request.get_json()
# log the details in the db
logDetails(whoAmI="user", query=clientInfo['name'])
# get user's email to send the FAQ
clientEmail = clientInfo['email']
clientName = clientInfo['name']
clientPhone = clientInfo['phone']
# send mail to the user
# d is a temp var can be used for debuging
d = MailAttachment(clientEmail=clientEmail).send()
# save the response in dictionary
response = {"fullFilmenttext":f"mail sent to {clientEmail}"}
# save the response in the db and return response
logDetails(whoAmI="bot", query=response)
return response
except Exception as e:
# in case of failure save error log and send the response with error msg
response = {"fullFilmenttext":f"process failed due to Error:{e}"}
logDetails(whoAmI="bot", query=response)
return response
Following are the supporting modules for app.py
To save logs -
saveLogs.py
from pymongo import MongoClient
import time
import json
class Log:
'''
It logs the data into cloud.mongodb.com
'''
def __init__(self, dbName="responder_googledb", MAX_RECORDS=100, dbPASSWORD=None):
self.MAX_RECORDS = MAX_RECORDS
self.dbName = dbName
self.dbPASSWORD = dbPASSWORD
# connection string is optained fromt the mongodb cloud
self.connectionString= f"mongodb+srv://responder_google01:{self.dbPASSWORD}@cluster0-cjxui.mongodb.net/test?retryWrites=true&w=majority"
self.client = MongoClient(self.connectionString)
def insertLog(self, enterLog=None):
'''Method to insert logs into the cloud mongodb'''
# get the db
db = self.client.get_database(self.dbName)
# get the records in which you wish to log data
records = db.responder_google01
# only enter into records if it hasn't reached the max_records
# max_records=100 by default
if records.count_documents({}) <= self.MAX_RECORDS:
# get current date and time
self.date, self.time = time.strftime("%Y-%m-%d|%H:%M:%S").split("|")
self.client = MongoClient(self.connectionString)
# save log, date, and time into a new record var
new_record = {"date":self.date,
"time":self.time,
"log":json.dumps(enterLog)} # had to jsonify
# insert the new record and return the success msg
records.insert_one(new_record)
return "Logged Successfully"
else:
return "MAX records reached. Delete previous records"
To send mail
email.py
# import neccessary libs and modules
import smtplib
import os
from email.message import EmailMessage
from getCredentials.read import ConfigReader
# define paths for attchments and HTML tempplate to send
CURRFOLDER="sendDetailedEmail"
# HTML_TEMPLATE_NAME = "DLM_Template.html"
HTML_TEMPLATE_NAME = "Template_corona_info.html"
HTML_TEMPLATE_PATH = os.path.join(CURRFOLDER, HTML_TEMPLATE_NAME)
# get the auth keys to send the mail
AUTH_DATA = ConfigReader()
eMAIL = AUTH_DATA.read_config()["eMAILsender"]
ePASSKEY = AUTH_DATA.read_config()["ePASSKEY"]
class MailAttachment:
'''
This class sends mail to the client by attaching necessary attachment
It attaches an HTML template as well.
'''
def __init__(self, clientEmail=None):
self.clientEmail = clientEmail
def send(self):
# put the Subject, From and To data for email
msg = EmailMessage()
msg['Subject'] = "Detailed Information about Covid-19"
msg['From'] = eMAIL
msg['To'] = self.clientEmail
# add the text content which is shown if HTML is off at the client
msg.set_content('Hi,\n\tPlease find the attachment below. \nRegards,\nSunny')
# attach the HTML content
with open(HTML_TEMPLATE_PATH, "r") as f:
html_content = f.read()
msg.add_alternative(html_content, subtype='html')
# adding other attachements
pdf_files = ['FAQ1.pdf']
for file in pdf_files:
path = os.path.join(CURRFOLDER, file)
with open(path, "rb") as f:
file_data = f.read()
file_name = f.name.split("/")[-1]
msg.add_attachment(file_data, maintype='application',
subtype='octet-stream', filename=file_name)
# login with the auth keys and send the email
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp:
smtp.login(eMAIL, ePASSKEY)
print("sending email...")
smtp.send_message(msg)
print("email Sent")
To get credentials
read.py
import configparser
import os
# get the path of config file
CURRENT_FOLDER = "getCredentials"
CONFIG_FILE = "config.ini"
CONFIG_FILE_PATH = os.path.join(CURRENT_FOLDER, CONFIG_FILE)
class ConfigReader:
'''
It read the following from the config file-
1. senders email
3. login keys of sender's email
3. passwords for the database
'''
def __init__(self):
self.configFileName = CONFIG_FILE_PATH
def read_config(self):
'''reads the congfig file to get details'''
self.config = configparser.ConfigParser()
self.config.read(self.configFileName)
self.configuration = self.config["DEFAULT"]
# extract the secret info from the config file and return the info
self.eMAILsender = self.configuration["eMAILsender"]
self.ePASSKEY = self.configuration["ePASSKEY"]
self.dbPASSWORD = self.configuration['dbPASSWORD']
return self.configuration
To covid data -
covidData.py
import pandas as pd
import requests
import json
import os
# get path of PINCODE csv file
UTILS = "covidInfo"
PINCODE_DATA_PATH = os.path.join(UTILS, "pinCodeData.csv")
class CovidCasesIndia:
'''
This serves you with State and PINCODE wise data.
'''
def __init__(self, PIN=None, state=None, df_path = PINCODE_DATA_PATH):
self.PIN = PIN
self.state = state
# read csv
self.df = pd.read_csv(df_path)
# APIs to get state and district wise covid data
self._stateAPI = "https://api.rootnet.in/covid19-in/stats/latest"
self._districtAPI = "https://api.covid19india.org/state_district_wise.json"
def get_data(self):
'''gets data wrt to PIN or state'''
if self.PIN is not None:
return {"district":self._get_data_fromPIN(), "state":self._get_data_fromSTATE()}
else:
return {"state":self._get_data_fromSTATE()}
def _get_district_data(self, state, district):
'''gets districts wise data'''
# read json paypload from district API
json_data = requests.get(self._districtAPI).json()
# extract district data and return the result
d_data = json_data[state]["districtData"][district]
return d_data
def _get_data_fromPIN(self):
'''get PIN wise data'''
def query_pincode(PIN):
'''gets state and district name from csv by using PINCODE provided'''
try:
# check if PIN is valid
if (type(PIN) in [int, str]) and (self.df["Pincode"].isin([PIN]).any()): # already_read
mask = int(PIN) == self.df['Pincode']
result = self.df[mask]
return result
return "Invalid PIN code or data is not availble"
except Exception as e:
return str(e)
query = query_pincode(self.PIN)
# extract state and distt. name from query
self.state = query.State.iloc[0]
self.dist = query.District.iloc[0]
try:
# return this if data of distt is present
return {"district":self.dist, "confirmed":self._get_district_data(self.state, self.dist)['confirmed']}
except:
# return this if data is not found from the requested API
return {"district":self.dist, "confirmed":0}
def _get_data_fromSTATE(self):
'''gets data state wise from the API'''
json_data = requests.get(self._stateAPI).json()
def query_state_wise(query):
for loc in json_data["data"]["regional"]:
if loc['loc'] == query:
return loc
break
return query_state_wise(self.state)