Skip to content
Snippets Groups Projects
db.py 5.77 KiB
Newer Older
from pymongo import MongoClient
import json
from bson import ObjectId
import pandas as pd
from datetime import datetime, timedelta, timezone

# Importing the dataset in the database

def import_expenses(user_id):
    # Connect to the local MongoDB server
    client = MongoClient('mongodb://localhost:27017/')
    db = client['financial_management']
    expenses_collection = db['expenses']

    # The ObjectId of the user "tushar"
    user_object_id = ObjectId(user_id)

    file_path = 'data/Expenses.txt'
    with open(file_path, 'r') as file:
        for line in file:
            expense_data = json.loads(line.strip())
            # Add the UserID reference before inserting
            expense_data['UserID'] = user_object_id
            expenses_collection.insert_one(expense_data)

    print("Data import complete.")


#Importing catergories data into categories collection
def import_categories():
    # Connect to the local MongoDB server
    client = MongoClient('mongodb://localhost:27017/')
    db = client['financial_management']
    categories_collection = db['categories']

    # Path to the categories.txt file on your local machine
    file_path = 'data/categories.txt'

    # Read categories from the categories.txt file
    categories_list = []
    with open(file_path, 'r') as file:
        for line in file:
            categories_list.append(json.loads(line.strip()))

    # Insert all categories into the database
    categories_collection.insert_many(categories_list)

    print("Categories import complete.")


# Function to retrieve the database connection
def get_db():
    client = MongoClient('mongodb://localhost:27017/')
    return client['financial_management']

# Function to fetch expenses and category data from MongoDB and return as DataFrames
def fetch_data():
    db = get_db()
    expenses_data = list(db.expenses.find({}, {'_id': 0, 'Amount': 1, 'DateOfExpense': 1, 'CategoryID': 1, 'MerchantName': 1}))
    categories_data = list(db.categories.find({}, {'_id': 0}))
    
    expenses_df = pd.DataFrame(expenses_data)
    categories_df = pd.DataFrame(categories_data)
    
    # Convert date strings to datetime objects
    expenses_df['DateOfExpense'] = pd.to_datetime(expenses_df['DateOfExpense'])
    return expenses_df, categories_df

# Function to fetch all budgets from the database
def fetch_all_budgets():
    db = get_db()
    return list(db.budgets.find({}, {'_id': 0, 'CategoryID': 1, 'BudgetLimit': 1, 'StartDate': 1, 'EndDate': 1, 'Period': 1}))


# Function to fetch all categories from the database
def fetch_categories():
    db = get_db()
    categories = list(db.categories.find({}, {'_id': 0, 'CategoryID': 1, 'CategoryName': 1}))
    return categories

# Function to fetch budget data for a given category and date range
def get_budget_data(category_id, start_date, end_date):
    db = get_db()
    budgets = list(db.budgets.find({
        "UserID": ObjectId("662947d472ecb2349f29cbe1"), 
        "CategoryID": category_id,
        "StartDate": {"$gte": start_date},
        "EndDate": {"$lte": end_date}
    }))
    return budgets

# Function to calculate the start and end of the current month
def get_time_bounds():
    now = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
    start = now.replace(day=1)  # Start of this month
    end = (start.replace(month=start.month % 12 + 1, day=1) - timedelta(microseconds=1))  # End of this month
    return start, end

# Function to fetch budget status, total spent, and total budget for a user and category
def get_budget_status(user_id, category_id):
    db = get_db()
    category_id = int(category_id)

    if category_id == 1:  # Assuming '1' is the ID for 'All'
        category_filter = {}
    else:
        category_filter = {"CategoryID": category_id}

    start_of_month, end_of_month = get_time_bounds()

    monthly_budgets = list(db.budgets.find({
        "UserID": ObjectId(user_id),
        **category_filter,
        "StartDate": {"$gte": start_of_month, "$lte": end_of_month},
    }))

    expenses = list(db.expenses.find({
        "UserID": ObjectId(user_id),
        **category_filter
    }))

    # Convert expenses to DataFrame and filter by date
    expenses_df = pd.DataFrame(expenses)
    if not expenses_df.empty:
        expenses_df['DateOfExpense'] = pd.to_datetime(expenses_df['DateOfExpense']).dt.tz_localize(timezone.utc)

    filtered_expenses_df = expenses_df[(expenses_df['DateOfExpense'] >= start_of_month) & (expenses_df['DateOfExpense'] <= end_of_month)]
    total_spent = filtered_expenses_df['Amount'].sum() if not expenses_df.empty else 0

    # Calculate total budget for the month
    total_budget = sum(b['BudgetLimit'] for b in monthly_budgets)

    # Calculate budget left
    monthly_budget_left = total_budget - total_spent
    budget_status = f"£{monthly_budget_left:.2f}" if monthly_budget_left >= 0 else "Budget exceeded"
    
    # Returning total spent and budget limit for pie chart visualization
    return budget_status, total_spent, total_budget




# Function to update the monthly budget in the database
def update_budget_in_db(user_id, category_id, amount, db):
    start_of_month, end_of_month = get_time_bounds()

    budgets_collection = db.budgets
    budgets_collection.update_one(
        {
            "UserID": ObjectId(user_id),
            "CategoryID": category_id
        },
        {
            "$set": {
                "BudgetLimit": amount,
                "StartDate": start_of_month,
                "EndDate": end_of_month
            }
        },
        upsert=True
    )










# Calling the function created above to set up the database

# Importing expenses from Expenses.txt
# import_expenses('662947d472ecb2349f29cbe1') #Uncomment this to import expenses from data/expenses.txt

# Importing  categories
# import_categories() #Uncomment this to import expenses from data/categories.txt