3학년/Project-이때어때

[problem-2] 해결( 할 수 있을까 ? )

즈_말 2024. 3. 27. 15:30

OAuth2의 비밀번호 해싱 과 jwt토큰을 통한 bearer

 

이제 보안이 중요하다.jwt토큰과 비밀번호 해싱을 통해 우리 만들거를 안전하게 만들어보자꾸나.

이코드 따라하면 실제로 써두댐

이전장에서 했던거에서 이어갈거니까 이전 장 참고해

 

JWT에 관하여

JWT의 의미는 JSON Web Token을 의미함.

객체를 공백없이 길고 뺶뺶하게 코드화 함 아래와같이.

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

 

이건 암호화 되지 않아서 누구나 복호화하여 정보를 알 수 있음

근데 서명이 되어 있어서  머? 방출할수 있다? 토큰을 ?

이런방식으로 넌 1주짜리 만료 토큰을 만들어 .그리고 유저가 오늘끄고 내일오면 여전히 로그인가능함.

한 주 후에 토큰이 만료되면 유저는 로그인하지못하고 새로운 토큰을받아야함. 만약 유저가 토큰을 수정하거나 바꾸려고하면( 서드파티로 )너는 찾을 수 있음 왜냐면 토큰이 매칭되지 않기때문이얌

만약 jwt토큰이 어떻게 작동하는지 보고싶으면 https://jwt.io 로가서 확인해보삼

 

다운로드하셈 " python-jose " 를

pip install "python-jose[cryptography]"

python-jose는 백엔드에서 추가로 암호화가 필요해

여기 암호화하는 방법 중 추천하는 문서야.https://cryptography.io/en/latest/

 

Welcome to pyca/cryptography — Cryptography 43.0.0.dev1 documentation

© Copyright 2013-2024, Individual Contributors. Revision 44860178.

cryptography.io

( 위 문서는 나중에 뜯어주겠음 ㅎㅎ; )

 

패스워드 hashing하십시오

ㅎㅐ싱의 의미는 비밀번호와 같은 내용물을 일련의 횡설수설한 바이트( 걍 string이긴함 )로 바꾸는거야.

같은 횡설수설한 내용물을 언제든지 지나갈 수 있따? 

그러나 암호화된걸 다시 비밀번호로 바꾸지는 못해.

 

왜 비밀번호 해싱을 하는가.

만약 데베 탈취되면 나쁜놈들이 너의 평문 비번을 뺏어가.

그래서 나쁜넘들이 비번을통해 다른시스템에 ( 대부분의 유저는 비슷한 비번 쓰기때문에 )접속 할 수도 있는거지.

 

다운로드하여라 " passlib "

passlib은 해싱된 패스워드를 관리하기위한 좋은 패키지 툴이야.

많은 해싱알고리즘과 유틸리티를 이걸통해 사용가능함.

추천하는 알고리즘은 " Bcrypt "임

ㄱㅡ래서, 일단 passLib의 Bcrypt를 설치해보좌.

pip install "passlib[bcrypt]"

 

해시하고 인증하자 패스워드를

이제 다운받은 passlib을 import해줘야함.

만들어보자 

유틸리티 함수를 실행해서 사용자가 입력하는 비밀번호를 생성하자.

그리고 인증 유틸리티를 통해 패스워드가 해시저장값과 일치하는지 보자

그리고 사용자를 인증하구 반환하자.

 

from datetime import datetime, timedelta, timezone
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)]
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return [{"item_id": "Foo", "owner": current_user.username}]

 

 

from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

추가된 건 위 코드인 듯 싶다.

 

 

 

절대 코드 다시짜기 귀찮아서가 아니라  ( 맞음 )

슬쩍 보아하니 ...jwt이런저런거 다 잘됐는데 리다이렉트에서 문제인거같음엔드포인트에 유저id넣으면 들어가지는것도

 

@w2m.get("/{id}", response_model=user_schema.User, response_class=HTMLResponse)
def get_user_by_code(id: str, request: Request, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.id == id).options(
        joinedload(User.groups)).first()
    user_dict = user_to_dict(user)
    return templates.TemplateResponse("index.html", {"request": request, "user_dict": user_dict})

 

이거 때문인거 같으니 과감하게 지워주구욘

@w2m.get("/", response_class=HTMLResponse)
def index(request: Request):
    return templates.TemplateResponse(request=request, name="index.html")


 

이걸 좀 수정 할 필요가 있음



후 저거 해석하다가 정신나갈거같아서 ( 왜 굳이? 왜 ? 저거 다 내가 진행한건데 딱보니까 ? )

해석하겠다는 의지는접어두고 하루동안 해결에 해맨결과

 

 

 

 

 

내 인생 최초 백엔드 1년째인 오늘!!!!!!!!!!!!!!!JWT토큰을 쿠키에저장하고 로그인로그아웃구현의 첫걸음을 딛었다는 말씀입니다!!!!!!!!!!!

@w2m.post("/", response_model=user_schema.Token)
async def login_for_access_token(response:Response, form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    # 사용자와 비밀번호 확인
    user = user_crud.get_user(db, form_data.username)
    if not user or not pwd_context.verify(form_data.password, user.pw):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="잘못된 사용자 이름 또는 비밀번호",
            headers={"WWW-Authenticate": "Bearer"},
        )
    exp = datetime.utcnow().astimezone(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 액세스 토큰 생성
    data = {
        "sub": user.id,
        "exp": exp
    }
    access_token = jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM)
   
    # 쿠키에 저장
    response.set_cookie(key="acess_token",value=access_token,expires=exp,httponly=True)
   
    return {
        "access_token": access_token,
        "token_type": "bearer",
        "username": data["sub"]
    }
from fastapi import status
   
@w2m.get("/logout", response_class=HTMLResponse)
async def logout(request: Request):
    response = RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)
    response.delete_cookie(key="access_token")
    return response

 

나는 저 엔드포인트가

꼭 logout이라는 페이지에서 실행해야 저게 되는건 줄 알았는데 아니라

그냥 저게 실행되면 로그아웃이라는 엔드포인트로 가게되는거로

리디렉션은 response로 해주면 댄다는 말씀 ㅇㅇ

위 같은 코드로 토큰생성하고 만료일생성한뒤에


import { GetUser } from "/static/js/getUser.js";

class Login {
    constructor() {
        this.getUser= new GetUser;

        this.login();
    }

    login() {
        const loginBtn = document.querySelector('.submit');
        loginBtn.addEventListener('click', async (event) => {
            event.preventDefault();
   
            const loginUsername = document.getElementById('userID').value;
            const loginPassword = document.getElementById('userPassword').value;

            console.log(loginUsername)
   
            try {
                const response = await fetch('http://127.0.0.1:8099/', {
             
                method: 'POST',
                    headers: {
                        'Content-Type': 'application/x-www-form-urlencoded',
                    },
                    body: `grant_type=&username=${encodeURIComponent(loginUsername)}&password=${encodeURIComponent(loginPassword)}&scope=&client_id=&client_secret=`,
                });
                if (!response.ok) {
                    throw new Error('로그인 실패');
                }
                const data = await response.json();
                // Store the token or do something with the data
                document.cookie = `access_token=${data.access_token}; expires=${new Date(data.exp)}; path=/`;
                console.log("로그인 성공");
                // 리디렉션 처리
                window.location.href = "/" ;
                this.getUser.getCookie();

            } catch (error) {
                console.error('로그인 에러:', error);
            }
        });
    }
}

window.addEventListener('load', function() {
    new Login();
});

로그인 실패 유무는 요로코롬 확인해주고 ( 초안 )

export class GetUser {
    constructor() {
        // 현재 로그인 유저 아이디
        this.whoThisUser = null;
        this.getCookie();
    }

    getCookie() {
        console.log("겟쿠키확인");
        if (document.cookie) {
            const cookies = document.cookie.split(`; `).map((el) => el.split('='));
            if (cookies.length > 0) {
                let getItem = [];
                for (let i = 0; i < cookies.length; i++) {
                    getItem.push(cookies[i][1]);
                }
                if (getItem.length > 0) {
                    console.log(getItem);
                    for (let i = 0; i < getItem.length; i++) {
                        const cookieParts = getItem[i].split('.');
                        if (cookieParts.length === 3) {
                            const [headerB64, payloadB64, signature] = cookieParts;
                            const headers = JSON.parse(atob(headerB64));
                            const payload = JSON.parse(atob(payloadB64));
                           
                            // payload에 있는 sub 값으로 whoThisUser 설정
                            this.whoThisUser = payload.sub;

                            console.log(headers);
                            console.log(payload);
                            console.log(signature);
                        }
                    }
                }
            }
        }
    }
}

쿠키에저장된 정보를 토대로 유저를 저장하는 그런코드 ㅇㅇ

 

 

아........진짜 매우 감격스럽지만 이게 보안1도안되어있는 개허접쓰레기코드인걸알지만

첫걸음이 젤 ..중요하지요 ?

이제 보안적부분 검색해보고 

코드 좀 깔쌈하게만들고

js인터렉션 추가하면 댑니다!!!!!!!! ㅠㅠㅠ

 

 

 

여기서 추가로 생각해야할 건, 저렇게 구구절절 코드를 써야 유저를 확인 가능할까 ??입니다.

그냥 로그인 성공시 바로 input값을 페이지에 저장해서 디비정보 확인하게끔하는게 어떨지도 생각중이고...

뭔가 비효율의 끝판왕코드를 짠 것 같아 그렇네여 ㅇㅇ

728x90