forked from yaso_meth/mih-project
224 lines
5.6 KiB
Python
224 lines
5.6 KiB
Python
from typing import Union
|
|
#import DatabaseConnect
|
|
import mysql.connector
|
|
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
app = FastAPI()
|
|
|
|
def dbConnect():
|
|
return mysql.connector.connect(
|
|
host="mysqldb",
|
|
user="root",
|
|
passwd="root",
|
|
database="patient_manager"
|
|
)
|
|
|
|
class fileRequest(BaseModel):
|
|
DocOfficeID: int
|
|
patientID: int
|
|
|
|
# Check if server is up
|
|
@app.get("/")
|
|
def read_root():
|
|
return serverRunning()
|
|
|
|
# Get Doctors Office By ID
|
|
@app.get("/docOffices/{docOffic_id}")
|
|
def read_docOfficeByID(docOffic_id: int):
|
|
db = dbConnect()
|
|
cursor = db.cursor()
|
|
query = "SELECT * FROM doctor_offices WHERE iddoctor_offices=%s"
|
|
cursor.execute(query, (docOffic_id,))
|
|
item = cursor.fetchone()
|
|
cursor.close()
|
|
db.close()
|
|
if item is None:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
return {"iddoctor_offices": item[0],
|
|
"office_name": item[1]}
|
|
|
|
# Get List of all Doctors Office
|
|
@app.get("/docOffices/")
|
|
def read_All_DoctorsOffice():
|
|
db = dbConnect()
|
|
cursor = db.cursor()
|
|
query = "SELECT * FROM doctor_offices"
|
|
cursor.execute(query)
|
|
items = [
|
|
{
|
|
"iddoctor_offices": item[0],
|
|
"office_name": item[1]
|
|
}
|
|
for item in cursor.fetchall()
|
|
]
|
|
cursor.close()
|
|
db.close()
|
|
return items
|
|
|
|
# Get Patient By ID Number
|
|
@app.get("/patients/{id_no}")
|
|
def read_patientByID(id_no: str):
|
|
db = dbConnect()
|
|
cursor = db.cursor()
|
|
query = "SELECT * FROM patients WHERE id_no=%s"
|
|
cursor.execute(query, (id_no,))
|
|
item = cursor.fetchone()
|
|
cursor.close()
|
|
db.close()
|
|
if item is None:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
return {"idpatients": item[0],
|
|
"id_no": item[1],
|
|
"first_name": item[2],
|
|
"last_name": item[3],
|
|
"email": item[4],
|
|
"cell_no": item[5],
|
|
"medical_aid_name": item[6],
|
|
"medical_aid_no": item[7],
|
|
"medical_aid_scheme": item[8],
|
|
"address": item[9],
|
|
"doc_office_id": item[10]}
|
|
|
|
# Get List of all patients
|
|
@app.get("/patients/")
|
|
def read_all_patients():
|
|
db = dbConnect()
|
|
cursor = db.cursor()
|
|
query = "SELECT * FROM patients"
|
|
cursor.execute(query)
|
|
items = [
|
|
{
|
|
"idpatients": item[0],
|
|
"id_no": item[1],
|
|
"first_name": item[2],
|
|
"last_name": item[3],
|
|
"email": item[4],
|
|
"cell_no": item[5],
|
|
"medical_aid_name": item[6],
|
|
"medical_aid_no": item[7],
|
|
"medical_aid_scheme": item[8],
|
|
"address": item[9],
|
|
"doc_office_id": item[10]
|
|
}
|
|
for item in cursor.fetchall()
|
|
]
|
|
cursor.close()
|
|
db.close()
|
|
return items
|
|
|
|
# Get List of all patients by Doctors Office
|
|
@app.get("/docOffice/patients/{docoff_id}")
|
|
def read_all_patientsby(docoff_id: str):
|
|
db = dbConnect()
|
|
cursor = db.cursor()
|
|
query = "SELECT * FROM patients where doc_office_id=%s"
|
|
cursor.execute(query, (docoff_id,))
|
|
items = [
|
|
{
|
|
"idpatients": item[0],
|
|
"id_no": item[1],
|
|
"first_name": item[2],
|
|
"last_name": item[3],
|
|
"email": item[4],
|
|
"cell_no": item[5],
|
|
"medical_aid_name": item[6],
|
|
"medical_aid_no": item[7],
|
|
"medical_aid_scheme": item[8],
|
|
"address": item[9],
|
|
"doc_office_id": item[10]
|
|
}
|
|
for item in cursor.fetchall()
|
|
]
|
|
cursor.close()
|
|
db.close()
|
|
return items
|
|
|
|
# Get List of all files
|
|
@app.get("/patients/files/")
|
|
def read_all_files():
|
|
db = dbConnect()
|
|
cursor = db.cursor()
|
|
query = "SELECT * FROM patient_files"
|
|
cursor.execute(query)
|
|
items = [
|
|
{
|
|
"idpatient_files": item[0],
|
|
"file_path": item[1],
|
|
"file_name": item[2],
|
|
"patient_id": item[3],
|
|
"insert_date": item[4],
|
|
}
|
|
for item in cursor.fetchall()
|
|
]
|
|
cursor.close()
|
|
db.close()
|
|
return items
|
|
|
|
# Get List of all files by patient
|
|
@app.get("/patients/files/{patientID}")
|
|
def read_all_files_by_patient(patientID: int):
|
|
db = dbConnect()
|
|
cursor = db.cursor()
|
|
query = "SELECT * FROM patient_files where patient_id = %s"
|
|
cursor.execute(query, (patientID,))
|
|
items = [
|
|
{
|
|
"idpatient_files": item[0],
|
|
"file_path": item[1],
|
|
"file_name": item[2],
|
|
"patient_id": item[3],
|
|
"insert_date": item[4],
|
|
}
|
|
for item in cursor.fetchall()
|
|
]
|
|
cursor.close()
|
|
db.close()
|
|
return items
|
|
|
|
# Get List of all notes
|
|
@app.get("/patients/notes/")
|
|
def read_all_notes():
|
|
db = dbConnect()
|
|
cursor = db.cursor()
|
|
query = "SELECT * FROM patient_notes"
|
|
cursor.execute(query)
|
|
items = [
|
|
{
|
|
"idpatient_notes": item[0],
|
|
"note_name": item[1],
|
|
"note_text": item[2],
|
|
"insert_date": item[3],
|
|
}
|
|
for item in cursor.fetchall()
|
|
]
|
|
cursor.close()
|
|
db.close()
|
|
return items
|
|
|
|
# Get List of all notes by patient
|
|
@app.get("/patients/notes/{patientID}")
|
|
def read_all_patientsby(patientID: int):
|
|
db = dbConnect()
|
|
cursor = db.cursor()
|
|
query = "SELECT * FROM patient_notes where patient_id = %s"
|
|
cursor.execute(query, (patientID,))
|
|
items = [
|
|
{
|
|
"idpatient_notes": item[0],
|
|
"note_name": item[1],
|
|
"note_text": item[2],
|
|
"insert_date": item[3],
|
|
}
|
|
for item in cursor.fetchall()
|
|
]
|
|
cursor.close()
|
|
db.close()
|
|
return items
|
|
|
|
|
|
def serverRunning():
|
|
return {"Status": "Server is Up and Running"}
|
|
|
|
|