updated with simplified version

This commit is contained in:
Chris Wong 2023-07-31 01:45:41 +08:00
parent 15faed98a8
commit d6dadd088b
5 changed files with 173 additions and 1423 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,13 +1,24 @@
from pdfminer.high_level import extract_text
import re
from datetime import datetime
import pdfplumber
import pandas as pd
# from pdfminer.high_level import extract_text
import tabula
class StatementParser:
def __init__(self, file_path: str):
def __init__(self, file_path):
self.file_path = file_path
def extract_text(self):
# Code to extract text from PDF
pass
with pdfplumber.open(self.file_path) as pdf:
full_text = "\n".join(page.extract_text() for page in pdf.pages)
return full_text
def extract_table(self):
tables = tabula.read_pdf(self.file_path, pages="all")
return tables
def extract_transactions(self):
# This method should be implemented by each subclass
@ -15,9 +26,72 @@ class StatementParser:
class DBSCreditCardStatementParser(StatementParser):
def extract_card_last_4_digits(self):
full_text = self.extract_text()
card_number_pattern = r"(\d{4}-\d{4}-\d{4}-\d{4})"
card_number_match = re.search(card_number_pattern, full_text)
if card_number_match:
card_number = card_number_match.group()
last_4_digits = card_number.split("-")[-1]
else:
raise ValueError("Card number not found in the statement.")
return last_4_digits
def extract_previous_balance(self):
full_text = self.extract_text()
prev_balance_pattern = r"PREVIOUS BALANCE ([0-9,]+\.\d{2})( CR)?"
prev_balance_match = re.search(prev_balance_pattern, full_text)
if prev_balance_match:
prev_balance = self.clean_amount(prev_balance_match.group(1))
else:
raise ValueError("Previous balance not found in the statement.")
return prev_balance
def extract_transactions(self):
# Code specific to DBS Credit Card statements
pass
full_text = self.extract_text()
pattern = r"(\d{2} \w{3})\s(\d{2} \w{3})\s(.*?)\s([0-9,]+\.\d{2}(?: CR)?)"
matches = re.findall(pattern, full_text, re.DOTALL)
df = pd.DataFrame(
matches, columns=["TRANS DATE", "POST DATE", "DESCRIPTION", "AMOUNT HKD"]
)
df["TRANS DATE"] = df["TRANS DATE"].apply(lambda x: self.add_year_to_date(x))
df["POST DATE"] = df["POST DATE"].apply(lambda x: self.add_year_to_date(x))
df["AMOUNT HKD"] = df["AMOUNT HKD"].apply(self.clean_amount)
df.insert(0, "Card Last 4 Digits", self.extract_card_last_4_digits())
df["RUNNING BALANCE"] = (
df["AMOUNT HKD"].cumsum() + self.extract_previous_balance()
)
return df
def clean_amount(self, amount):
if "CR" in amount:
amount = -float(amount.replace(",", "").replace("CR", ""))
else:
amount = float(amount.replace(",", ""))
return amount
def extract_statement_date(self):
full_text = self.extract_text()
statement_date_pattern = r"STATEMENT DATE (\d{2} \w{3} \d{4})"
statement_date_match = re.search(statement_date_pattern, full_text)
if statement_date_match:
statement_date = datetime.strptime(
statement_date_match.group(1), "%d %b %Y"
)
else:
raise ValueError("Statement date not found in the statement.")
return statement_date
def add_year_to_date(self, date):
statement_date = self.extract_statement_date()
date_with_year = datetime.strptime(
date + " " + str(statement_date.year), "%d %b %Y"
)
if date_with_year > statement_date:
date_with_year = datetime.strptime(
date + " " + str(statement_date.year - 1), "%d %b %Y"
)
return date_with_year
class BankSavingAccountStatementParser(StatementParser):
@ -26,9 +100,6 @@ class BankSavingAccountStatementParser(StatementParser):
pass
class FundAccountStatementParser(StatementParser):
def extract_transactions(self):
# Code specific to Fund Account statements

View File

@ -0,0 +1,46 @@
# %%
import tabula
import pandas as pd
# Path to the PDF file
file_path = r"Z:\chris\projects\family_finance\personal-finance-database\data\raw\chris' statements\credit cards\sc\sc_credit_card_eStatement_202303.pdf"
# Use tabula to read the tables from the PDF
tables = tabula.read_pdf(file_path, pages='all', lattice=True)
print(len(tables))
# %%
# 'tables' is a list of DataFrames, one for each table found in the PDF.
# You can access individual tables like this:
# df1 = tables[0] # This is the first table
# df2 = tables[1] # This is the second table
# df3 = tables[2]
# df4 = tables[3]
# From here, you can clean up and process the data in each DataFrame as needed.
# For example, you might need to rename columns, convert data types, handle missing values, etc.
tables = tabula.read_pdf(file_path, pages='all', stream=True)
for idx, table in enumerate(tables):
print(idx)
print(table)
# %%
df = tables[4]
# %%
# Replace the "\r" characters with a unique delimiter
df.replace(to_replace='\r', value='|', regex=True, inplace=True)
# %%
# Split the cells into separate rows
# Split the cells into separate rows
df['Date\r日期'] = df['Date\r日期'].str.split('|')
df = df.explode('Date\r日期')
# %%
df
# %%
# tables = tabula.read_pdf(file_path, pages='all', stream=True)
tables = tabula.read_pdf(file_path, pages='all', split_text=True)
# %%

View File

@ -1,15 +1,17 @@
import pytest
from pdf_parser import PdfParser
def test_pdf_parser_text_extraction():
pdf_parser = PdfParser('path_to_test_pdf')
pdf_parser = PdfParser("path_to_test_pdf")
text = pdf_parser.extract_text()
assert isinstance(text, str)
assert len(text) > 0
def test_pdf_parser_table_extraction():
pdf_parser = PdfParser('path_to_test_pdf')
pdf_parser = PdfParser("path_to_test_pdf")
tables = pdf_parser.extract_table()
assert isinstance(tables, list)

28
tests/test_pdf_parser.py Normal file
View File

@ -0,0 +1,28 @@
import pandas as pd
import pytest
from pdf_parser import DBSCreditCardStatementParser
def test_dbs_credit_card_statement_parser():
# The path to a sample PDF file to use for testing
sample_pdf_path = "tests/sample_dbs_statement.pdf"
# Initialize the parser
parser = DBSCreditCardStatementParser(sample_pdf_path)
# Parse the PDF file
df = parser.parse()
# Check the DataFrame's columns
assert list(df.columns) == [
"Card Last 4 Digits",
"TRANS DATE",
"POST DATE",
"DESCRIPTION",
"AMOUNT HKD",
"RUNNING BALANCE",
]
# Check the DataFrame's number of rows
# (Replace 21 with the actual number of transactions in the sample PDF file)
assert len(df) == 21