Newer
Older
from pymongo import MongoClient
import json
from bson import ObjectId
import pandas as pd
from datetime import datetime, timedelta, timezone
# Importing the dataset in the database
def import_expenses(user_id):
# Connect to the local MongoDB server
client = MongoClient('mongodb://localhost:27017/')
db = client['financial_management']
expenses_collection = db['expenses']
# The ObjectId of the user "tushar"
user_object_id = ObjectId(user_id)
file_path = 'data/Expenses.txt'
with open(file_path, 'r') as file:
for line in file:
expense_data = json.loads(line.strip())
# Add the UserID reference before inserting
expense_data['UserID'] = user_object_id
expenses_collection.insert_one(expense_data)
print("Data import complete.")
#Importing catergories data into categories collection
def import_categories():
# Connect to the local MongoDB server
client = MongoClient('mongodb://localhost:27017/')
db = client['financial_management']
categories_collection = db['categories']
# Path to the categories.txt file on your local machine
file_path = 'data/categories.txt'
# Read categories from the categories.txt file
categories_list = []
with open(file_path, 'r') as file:
for line in file:
categories_list.append(json.loads(line.strip()))
# Insert all categories into the database
categories_collection.insert_many(categories_list)
print("Categories import complete.")
# Function to retrieve the database connection
def get_db():
client = MongoClient('mongodb://localhost:27017/')
return client['financial_management']
# Function to fetch expenses and category data from MongoDB and return as DataFrames
def fetch_data():
db = get_db()
expenses_data = list(db.expenses.find({}, {'_id': 0, 'Amount': 1, 'DateOfExpense': 1, 'CategoryID': 1, 'MerchantName': 1}))
categories_data = list(db.categories.find({}, {'_id': 0}))
expenses_df = pd.DataFrame(expenses_data)
categories_df = pd.DataFrame(categories_data)
# Convert date strings to datetime objects
expenses_df['DateOfExpense'] = pd.to_datetime(expenses_df['DateOfExpense'])
return expenses_df, categories_df
# Function to fetch all budgets from the database
def fetch_all_budgets():
db = get_db()
return list(db.budgets.find({}, {'_id': 0, 'CategoryID': 1, 'BudgetLimit': 1, 'StartDate': 1, 'EndDate': 1, 'Period': 1}))
# Function to fetch all categories from the database
def fetch_categories():
db = get_db()
categories = list(db.categories.find({}, {'_id': 0, 'CategoryID': 1, 'CategoryName': 1}))
return categories
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# Function to fetch budget data for a given category and date range
def get_budget_data(category_id, start_date, end_date):
db = get_db()
budgets = list(db.budgets.find({
"UserID": ObjectId("662947d472ecb2349f29cbe1"),
"CategoryID": category_id,
"StartDate": {"$gte": start_date},
"EndDate": {"$lte": end_date}
}))
return budgets
# Function to calculate the start and end of the current month
def get_time_bounds():
now = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
start = now.replace(day=1) # Start of this month
end = (start.replace(month=start.month % 12 + 1, day=1) - timedelta(microseconds=1)) # End of this month
return start, end
# Function to fetch budget status, total spent, and total budget for a user and category
def get_budget_status(user_id, category_id):
db = get_db()
category_id = int(category_id)
if category_id == 1: # Assuming '1' is the ID for 'All'
category_filter = {}
else:
category_filter = {"CategoryID": category_id}
start_of_month, end_of_month = get_time_bounds()
monthly_budgets = list(db.budgets.find({
"UserID": ObjectId(user_id),
**category_filter,
"StartDate": {"$gte": start_of_month, "$lte": end_of_month},
}))
expenses = list(db.expenses.find({
"UserID": ObjectId(user_id),
**category_filter
}))
# Convert expenses to DataFrame and filter by date
expenses_df = pd.DataFrame(expenses)
if not expenses_df.empty:
expenses_df['DateOfExpense'] = pd.to_datetime(expenses_df['DateOfExpense']).dt.tz_localize(timezone.utc)
filtered_expenses_df = expenses_df[(expenses_df['DateOfExpense'] >= start_of_month) & (expenses_df['DateOfExpense'] <= end_of_month)]
total_spent = filtered_expenses_df['Amount'].sum() if not expenses_df.empty else 0
# Calculate total budget for the month
total_budget = sum(b['BudgetLimit'] for b in monthly_budgets)
# Calculate budget left
monthly_budget_left = total_budget - total_spent
budget_status = f"£{monthly_budget_left:.2f}" if monthly_budget_left >= 0 else "Budget exceeded"
# Returning total spent and budget limit for pie chart visualization
return budget_status, total_spent, total_budget
# Function to update the monthly budget in the database
def update_budget_in_db(user_id, category_id, amount, db):
start_of_month, end_of_month = get_time_bounds()
budgets_collection = db.budgets
budgets_collection.update_one(
{
"UserID": ObjectId(user_id),
"CategoryID": category_id
},
{
"$set": {
"BudgetLimit": amount,
"StartDate": start_of_month,
"EndDate": end_of_month
}
},
upsert=True
)
# Calling the function created above to set up the database
# Importing expenses from Expenses.txt
# import_expenses('662947d472ecb2349f29cbe1') #Uncomment this to import expenses from data/expenses.txt
# Importing categories
# import_categories() #Uncomment this to import expenses from data/categories.txt