python+sqlite+vue+ai大模型api打造一个公司ai智能记账web应用代码

代码语言:python

所属分类:其他

代码描述:python+sqlite+vue+ai大模型api打造一个公司ai智能记账web应用代码,ai自动判别贷方与借方类目,自动生成带echarts图表的利润表、资产负债表、现金流量表,ui界面美观好看简洁。支持所有兼容openai的ai大模型api调用,后期可增加自动识别票据自动填写的新功能。

代码标签: python sqlite vue ai 大模型 api 打造 公司 ai 智能 记账 web 应用

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

#!/usr/local/python3/bin/python3
# -*- coding: utf-8 -*
# main.py
# 运行命令: uvicorn main:app --reload

import os
import datetime
from typing import List, Optional

# --- Python标准库和第三方库导入 ---
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey
from sqlalchemy.orm import sessionmaker, Session, relationship
from sqlalchemy.ext.declarative import declarative_base
import uvicorn
from openai import AsyncOpenAI, OpenAIError
import json

# --- 配置 ---
# !! 重要 !! 在这里替换为你的OpenAI API密钥或兼容opeani的其他的ai大模型
OPENAI_API_KEY = "sk-"
OPENAI_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"

# 数据库配置
DATABASE_URL = "sqlite:///./accounting.db"

# JWT安全配置
SECRET_KEY = "a_very_secret_key_for_jwt_token_that_is_long_and_secure"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60

# --- 数据库模型 (SQLAlchemy) ---
Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    hashed_password = Column(String)

class Account(Base):
    __tablename__ = "accounts"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, unique=True, index=True)
    category = Column(String)

class Transaction(Base):
    __tablename__ = "transactions"
    id = Column(Integer, primary_key=True, index=True)
    description = Column(String, index=True)
    amount = Column(Float, nullable=False)
    date = Column(DateTime, default=datetime.datetime.utcnow)
    debit_account_id = Column(Integer, ForeignKey("accounts.id"))
    credit_account_id = Column(Integer, ForeignKey("accounts.id"))
    debit_account = relationship("Account", foreign_keys=[debit_account_id])
    credit_account = relationship("Account", foreign_keys=[credit_account_id])

engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# --- Pydantic 数据模型 (用于API) ---
class AccountSchema(BaseModel):
    id: int
    name: str
    category: str
    class Config: from_attributes = True

class TransactionSchema(BaseModel):
    id: int; description: str; amount: float; date: datetime.datetime
    debit_account: AccountSchema; credit_account: AccountSchema
    class Config: from_attributes = True

class TransactionCreate(BaseModel):
    description: str; amount: float; debit_account_id: int; credit_account_id: int
    date: Optional[datetime.datetime] = None

class Token(BaseModel): access_token: str; token_type: str
class TokenData(BaseModel): username: Optional[str] = None
class AIDescription(BaseModel): description: str
class UserSchema(BaseModel):
    username: str
    class Config: from_attributes = True

# --- FastAPI应用实例 ---
app = FastAPI()

# --- 依赖项和安全认证 ---
def get_db():
    db = SessionLocal()
    try: yield db
    finally: db.close()

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def verify_password(plain, hashed): return pwd_context.verify(plain, hashed)
def get_password_hash(pwd): return pwd_context.hash(pwd)

async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    exc = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"})
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None: raise exc
    except JWTError: raise exc
    user = db.query(User).filter(User.username == username).first()
    if user is None: raise exc
    return user

# --- AI 服务 ---
async def get_ai_suggestion(description: str, db: Session):

    accounts = db.query(Account).all()
    account_list = ", ".join([f'"{a.name}" (ID: {a.id})' for a in accounts])
    prompt = f'Analyze transaction: "{description}". Extract amount, and find debit/credit account IDs from this list: {account_list}. Respond in strict JSON with keys "debit_account_id", "credit_account_id", "amount".'
    try:
      
        client = AsyncOpenAI(api_key=OPENAI_API_KEY,base_url=OPENAI_BASE_URL)
        resp = await client.chat.completions.create(model="qwen-max", messages=[{"role":"user","content":prompt}], response_format={"type":"json_object"}, temperature=0.1)
        sug = json.loads( resp.choices[0].message.content )
        if not all(k in sug for k in ["debit_account_id", "credit_account_id"]): raise ValueError("Missing account keys.")
        if sug.get("amount") is not None: sug['amount'] = float(sug['amount'])
        return sug
    except Exception as e: raise HTTPException(status_code=500, detail=f"AI suggestion failed: {e}")

# --- API Endpoints ---
@app.post("/token", response_model=Token)
async def login(db: Session = Depends(get_db), form: OAuth2PasswordRequestForm = Depends()):
    user = db.query(User).filter(User.username == form.username).first()
    if not user or not verify_password(form.password, user.hashed_password): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password")
    expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    token = jwt.encode({"sub": user.username, "exp": expire}, SECRET_KEY, algorithm=ALGORITHM)
    return {"access_token": token, "token_type": "bearer"}

@app.get("/api/users/me", response_model=UserSchema)
async def me(current_user: User = Depends(get_current_user)): return current_user

@app.get("/api/accounts", response_model=List[AccountSchema])
async def get_accounts(db: Session = Depends(get_db), user: User = Depends(get_current_user)): return db.query(Account).order_by(Account.category, Account.name).all()

@app.get("/api/transactions", response_model=List[TransactionSchema])
async def get_tx(db: Session = Depends(get_db), user: User = Depends(get_current_user)): return db.query(Transaction).order_by(Transaction.date.desc()).all()

@app.post("/api/transactions", response_model=TransactionSchema)
async def create_tx(tx: TransactionCreate, db: Session = Depends(get_db), user: User = Depends(get_current_user)):
    if tx.debit_account_id == tx.credit_account_id: raise HTTPException(status_code=400, detail="Debit/credit accounts must be different.")
    if tx.amount <= 0: raise HTTPException(status_code=400, detail="Amount must be positive.")
    db_tx = Transaction(**tx.dict()); db.add(db_tx); db.commit(); db.refresh(db_tx)
    return db_tx

@app.post("/api/ai/suggest-category")
async def suggest(data: AIDescription, db: Session = Depends(get_db), user: User = Depends(get_current_user)):
    return JSONResponse(content=await get_ai_suggestion(data.description, db))

# --- 报表API ---
@app.get("/api/reports/{report_name}")
def get_report(report_name: str, db: Session = Depends(get_db), user: User = Depends(get_current_user)):
    if report_name == "income-statement":
        rev_accs = {a.id: a.name for a in db.query(Account).filter(Account.category == '收入').all()}
        exp_accs = {a.id: a.name for a in db.query(Account).filter(Account.category == '支出').all()}
        revs, total_rev = {}, 0
        for acc_id, acc_name in rev_accs.items():
            bal = sum(t.amount for t in db.query(Transaction.amount).filter(Transaction.credit_account_id == acc_id)) - sum(t.amount for t in db.query(Transaction.amount).filter(Transaction.debit_account_id == acc_id))
            if bal > 0: revs[acc_name] = bal; total_rev += bal
        exps, total_exp = {}, 0
        for acc_id, acc_name in exp_accs.items():
            bal = sum(t.amount for t in db.query(Transaction.amount).filter(Transaction.debit_account_id == acc_id)) - sum(t.amount for t in db.query(Transaction.amount).filter(Transaction.credit_account_id == acc_id))
            if bal > 0: exps[acc_name] = bal; total_exp += bal
        return {"revenues": [{"name": k,"value": v} for k,v in revs.items()], "expenses": [{"name": k,"value": v} for k,v in exps.items()], "total_revenue": total_rev, "total_expense": total_exp, "net_income": total_rev - total_exp}

    if report_name == "balance-sheet":
        net_income = get_report("income-statement", db, user)['net_income']
        balances = {}
        for acc in db.query(Account).all():
            bal = sum(t.amount for t in db.query(Transaction.amount).filter(Transaction.debit_account_id == acc.id)) - sum(t.amount for t in db.query(Transaction.amount).filter(Transaction.credit_account_id == acc.id))
            if acc.category not in ['资产', '支出']: bal = -bal
            if bal != 0: balances[acc.name] = {"balance": bal, "category": acc.category}
        assets = [{"name": n,"value": d["balance"]} for n,d in balances.items() if d["category"] == '资产']
        liabilities = [{"name": n,"value": d["balance"]} for n,d in balances.items() if d["category"] == '负债']
        equity = [{"name": n,"value": d["balance"]} for n,d in balances.items() if d["category"] == '权益']
        if net_income != 0: equity.append({"name": "本期净利润 (留存收益)", "value": net_income})
        total_assets = sum(i['value'] for i in assets)
        total_liab_equity = sum(i['value'] for i in liabilities) + sum(i['value'] for i in equity)
        return {"assets": assets, "liabilities": liabilities, "equity": equity, "total_assets": total_assets, "total_liabilities_and_equity": total_liab_equity}

    if report_name == "cash-flow-statement":
        cash_acc = db.query(Account).filter(Account.name == '现金').first()
        if not cash_acc: return {"inflows":[], "outflows":[], "net_cash_flow":0, "total_inflows":0, "total_outflows":0}
        inflows = [{"desc":f"{t.description} (from: {t.credit_account.name})","val":t.amount} for t in db.query(Transaction).filter(Transaction.debit_account_id == cash_acc.id).all()]
        outflows = [{"desc":f"{t.description} (to: {t.debit_account.name})","val":t.amount} for t in db.query(Transaction).filter(Transaction.credit_account_id == cash_acc.id).all()]
        total_in, total_out = sum(i['val'] for i in inflows), sum(o['val'] for o in outflows)
        return {"inflows":inflows, "outflows":outflows, "total_inflows":total_in, "total_outflows":total_out, "net_cash_flow":total_in - total_out}
    
    raise HTTPException(status_code=404, detail="Report not found")

# --- 数据库初始化 ---
def init_db():
    Base.metadata.create_all(bind=engine)
    db = SessionLocal()
    if db.query(User).count() > 0: db.close(); return
    print("数据库为空,正在进行初始化并载入示例数据...")
    db.add(User(username="admin", hashed_password=get_password_hash("password")))
    accounts = {n: Account(name=n, category=c) for n,c in [("现金","资产&quo.........完整代码请登录后点击上方下载按钮下载查看

网友评论0