I am using this library to reset the user’s password. This is the block of code that generates the code:
async def create_code(db: AsyncSession, user):
secret_key = pyotp.random_base32()
totp = pyotp.TOTP(secret_key, interval=settings.verification_code_expire_time)
verification_code = totp.now()
if not user.code:
user.code = VerificationCode(code=verification_code, secret_key=secret_key)
else:
user.code.code = verification_code
user.code.secret_key = secret_key
await db.commit()
return verification_code
I have two endpoints on to send the verification code to the email and the other to reset the user’s password. Here is the implementation of both methods:
Method to send the verification code:
async def send_verification_code(email: EmailStr, db: AsyncSession):
user = await find_user_by_email_one_or_none(db=db, email=email)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
if not user.verified:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email not verified")
verification_code = await create_code(db, user)
body = f"This is your verification code {verification_code} to reset the password."
email_sender = EmailSender(receiver_email=user.email, subject="Password Reset", body=body)
await email_sender.send()
return {"detail": "Verification code has been sent to your email."}
Method to reset password:
async def reset_password(reset_password_schema: PasswordResetSchema, db: AsyncSession):
query = select(VerificationCode).where(VerificationCode.code == reset_password_schema.verification_code)
result = await db.execute(query)
verification_code = result.scalar_one_or_none()
if not verification_code:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code.")
totp = pyotp.TOTP(verification_code.secret_key, interval=settings.verification_code_expire_time)
if not totp.verify(reset_password_schema.verification_code):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Verification code has been expired.")
query = update(User).where(User.id == verification_code.user_id).values(password=reset_password_schema.new_password)
await db.execute(query)
user = await find_user_by_id(db, verification_code.user_id)
if user.first_login:
user.first_login = False
query = delete(VerificationCode).where(VerificationCode.code == reset_password_schema.verification_code)
await db.execute(query)
await db.commit()
return {"detail": "Password has been reset successfully."}
The code works fine but sometime it shows the “Verification code has been expired” error even when the code is not expired. I tried tracing the error but the implementation seems fine as per the library’s documentation. I could not find out the actual reason behind this. Has anyone ever faced this issue? or am I missing out something important on this implementation?