Skip to content
Snippets Groups Projects 5.77 KiB
Newer Older
from pymongo import MongoClient
import json
from bson import ObjectId
import pandas as pd
from datetime import datetime, timedelta, timezone

# Importing the dataset in the database

def import_expenses(user_id):
    # Connect to the local MongoDB server
    client = MongoClient('mongodb://localhost:27017/')
    db = client['financial_management']
    expenses_collection = db['expenses']

    # The ObjectId of the user "tushar"
    user_object_id = ObjectId(user_id)

    file_path = 'data/Expenses.txt'
    with open(file_path, 'r') as file:
        for line in file:
            expense_data = json.loads(line.strip())
            # Add the UserID reference before inserting
            expense_data['UserID'] = user_object_id

    print("Data import complete.")

#Importing catergories data into categories collection
def import_categories():
    # Connect to the local MongoDB server
    client = MongoClient('mongodb://localhost:27017/')
    db = client['financial_management']
    categories_collection = db['categories']

    # Path to the categories.txt file on your local machine
    file_path = 'data/categories.txt'

    # Read categories from the categories.txt file
    categories_list = []
    with open(file_path, 'r') as file:
        for line in file:

    # Insert all categories into the database

    print("Categories import complete.")

# Function to retrieve the database connection
def get_db():
    client = MongoClient('mongodb://localhost:27017/')
    return client['financial_management']

# Function to fetch expenses and category data from MongoDB and return as DataFrames
def fetch_data():
    db = get_db()
    expenses_data = list(db.expenses.find({}, {'_id': 0, 'Amount': 1, 'DateOfExpense': 1, 'CategoryID': 1, 'MerchantName': 1}))
    categories_data = list(db.categories.find({}, {'_id': 0}))
    expenses_df = pd.DataFrame(expenses_data)
    categories_df = pd.DataFrame(categories_data)
    # Convert date strings to datetime objects
    expenses_df['DateOfExpense'] = pd.to_datetime(expenses_df['DateOfExpense'])
    return expenses_df, categories_df

# Function to fetch all budgets from the database
def fetch_all_budgets():
    db = get_db()
    return list(db.budgets.find({}, {'_id': 0, 'CategoryID': 1, 'BudgetLimit': 1, 'StartDate': 1, 'EndDate': 1, 'Period': 1}))

# Function to fetch all categories from the database
def fetch_categories():
    db = get_db()
    categories = list(db.categories.find({}, {'_id': 0, 'CategoryID': 1, 'CategoryName': 1}))
    return categories

# Function to fetch budget data for a given category and date range
def get_budget_data(category_id, start_date, end_date):
    db = get_db()
    budgets = list(db.budgets.find({
        "UserID": ObjectId("662947d472ecb2349f29cbe1"), 
        "CategoryID": category_id,
        "StartDate": {"$gte": start_date},
        "EndDate": {"$lte": end_date}
    return budgets

# Function to calculate the start and end of the current month
def get_time_bounds():
    now =, minute=0, second=0, microsecond=0)
    start = now.replace(day=1)  # Start of this month
    end = (start.replace(month=start.month % 12 + 1, day=1) - timedelta(microseconds=1))  # End of this month
    return start, end

# Function to fetch budget status, total spent, and total budget for a user and category
def get_budget_status(user_id, category_id):
    db = get_db()
    category_id = int(category_id)

    if category_id == 1:  # Assuming '1' is the ID for 'All'
        category_filter = {}
        category_filter = {"CategoryID": category_id}

    start_of_month, end_of_month = get_time_bounds()

    monthly_budgets = list(db.budgets.find({
        "UserID": ObjectId(user_id),
        "StartDate": {"$gte": start_of_month, "$lte": end_of_month},

    expenses = list(db.expenses.find({
        "UserID": ObjectId(user_id),

    # Convert expenses to DataFrame and filter by date
    expenses_df = pd.DataFrame(expenses)
    if not expenses_df.empty:
        expenses_df['DateOfExpense'] = pd.to_datetime(expenses_df['DateOfExpense']).dt.tz_localize(timezone.utc)

    filtered_expenses_df = expenses_df[(expenses_df['DateOfExpense'] >= start_of_month) & (expenses_df['DateOfExpense'] <= end_of_month)]
    total_spent = filtered_expenses_df['Amount'].sum() if not expenses_df.empty else 0

    # Calculate total budget for the month
    total_budget = sum(b['BudgetLimit'] for b in monthly_budgets)

    # Calculate budget left
    monthly_budget_left = total_budget - total_spent
    budget_status = f"£{monthly_budget_left:.2f}" if monthly_budget_left >= 0 else "Budget exceeded"
    # Returning total spent and budget limit for pie chart visualization
    return budget_status, total_spent, total_budget

# Function to update the monthly budget in the database
def update_budget_in_db(user_id, category_id, amount, db):
    start_of_month, end_of_month = get_time_bounds()

    budgets_collection = db.budgets
            "UserID": ObjectId(user_id),
            "CategoryID": category_id
            "$set": {
                "BudgetLimit": amount,
                "StartDate": start_of_month,
                "EndDate": end_of_month

# Calling the function created above to set up the database

# Importing expenses from Expenses.txt
# import_expenses('662947d472ecb2349f29cbe1') #Uncomment this to import expenses from data/expenses.txt

# Importing  categories
# import_categories() #Uncomment this to import expenses from data/categories.txt