Here is my backend code in python:
@app.route("/api/generate/all", methods=["POST"])
async def render_code_all():
try:
data = request.get_json()
practicals = data.get("practicals", [])
practicals = practicals[5:10]
name = data.get("name", -1)
enrollment_no = data.get("enrollment_no", -1)
_class = data.get("_class", -1)
invert = data.get("invert", False)
semester = data.get("semester", -1)
subject = data.get("subject", -1)
missing_values = []
styles = Styles()
pdf_filename = f"{name.replace(' ', '_')}_{enrollment_no}_{str(uuid.uuid4()).replace('-', '')}.pdf"
doc = SimpleDocTemplate(pdf_filename, pagesize=letter,
rightMargin=0.6*inch, leftMargin=0.6*inch,
topMargin=0.3*inch, bottomMargin=0.3*inch)
pdf_styles = getSampleStyleSheet()
elements = []
elements.append(Paragraph(f"{subject} | Semester {semester}", pdf_styles["Title"]))
progress = 0
total = len(practicals)
processed = 0
for practical in practicals:
aim = practical.get("aim", "")
written_work = practical.get("written_work", "")
code_template = practical.get("code_template", "")
code_theme = practical.get("code_template_highlight", "default")
output_template = practical.get("output_template", "")
output_theme = practical.get("output_highlight", "default")
language = practical.get("code_language", "")
num = practical.get("num", "")
if code_theme not in styles.styles.keys():
return jsonify({"error": f"Invalid code_theme. Available code_themes: {', '.join(styles.styles.keys())}"}), 400
if code_theme in styles.styles.keys():
code_theme = styles.get_style(code_theme)
if output_theme not in styles.styles.keys():
return jsonify({"error": f"Invalid output_theme. Available output_themes: {', '.join(styles.styles.keys())}"}), 400
if output_theme in styles.styles.keys():
output_theme = styles.get_style(output_theme)
wrapped_code = wrap_code(code_template.replace("{{name}}", name).replace("{{enrollment_no}}", enrollment_no).replace("{{class}}", _class), width=80)
wrapped_output = wrap_code(output_template.replace("{{name}}", name).replace("{{enrollment_no}}", enrollment_no).replace("{{class}}", _class), width=80)
lexer = get_lexer_by_name(language)
if font_file_code is None or font_file_output is None:
return jsonify({"error": "Failed to download font file."}), 500
code_formatter = ImageFormatter(
style=code_theme,
line_pad=10,
font_size=40,
line_numbers=False,
font_name=font_file_code
)
output_formatter = ImageFormatter(
style=output_theme,
line_pad=10,
font_size=40,
line_numbers=False,
font_name=font_file_output
)
code_image_data = highlight(wrapped_code, lexer, code_formatter)
output_image_data = highlight(wrapped_output, lexer, output_formatter)
if invert:
code_image = Image.open(BytesIO(code_image_data))
code_image = PIL.ImageOps.invert(code_image)
code_image_buffer = BytesIO()
code_image.save(code_image_buffer, format="PNG")
code_image_data = code_image_buffer.getvalue()
output_image = Image.open(BytesIO(output_image_data))
output_image = PIL.ImageOps.invert(output_image)
output_image_buffer = BytesIO()
output_image.save(output_image_buffer, format="PNG")
output_image_data = output_image_buffer.getvalue()
heading_ = Paragraph(f"Experiment - {num}", pdf_styles["Title"])
heading_.hAlign = "CENTER"
elements.append(heading_)
elements.append(Paragraph("Aim", pdf_styles["Heading2"]))
elements.append(Paragraph(aim, pdf_styles["BodyText"]))
elements.append(Spacer(1, 12))
elements.append(Paragraph("Written Work", pdf_styles["Heading2"]))
elements.append(Paragraph(written_work.replace("n", "<br/>"), pdf_styles["BodyText"]))
elements.append(Spacer(1, 12))
elements.append(Paragraph("Code", pdf_styles["Heading2"]))
code_image_buffer = BytesIO(code_image_data)
code_image_info = PILImage.open(code_image_buffer)
code_w, code_h = code_image_info.size
while code_w > 513 or code_h > 736:
code_w, code_h = code_w * 0.9, code_h * 0.9
code_image_pdf = Image(code_image_buffer, width=code_w * (cm/37.79527559055118) * code_scale, height=code_h * (cm/37.79527559055118) * code_scale)
code_image_pdf.hAlign = "LEFT"
elements.append(KeepTogether(code_image_pdf))
elements.append(Spacer(1, 12))
elements.append(Paragraph("Output", pdf_styles["Heading2"]))
output_image_buffer = BytesIO(output_image_data)
output_image_info = PILImage.open(output_image_buffer)
output_w, output_h = output_image_info.size
while output_w > 513 or output_h > 736: output_w, output_h = output_w * 0.9, output_h * 0.9
output_image_pdf = Image(output_image_buffer, width=output_w * (cm/37.79527559055118) * output_scale, height=output_h * (cm/37.79527559055118) * output_scale)
output_image_pdf.hAlign = "LEFT"
elements.append(KeepTogether(output_image_pdf))
elements.append(Spacer(1, 24))
if practical != practicals[-1]:
elements.append(PageBreak())
processed += 1
progress = round((processed/total)*100, 2)
print(f"Generated {num} of {total} practicals.")
print(f"Progress: {progress}%")
handle_generate_progress({'progress': progress})
elements.append(Paragraph(f"<b>Name</b>: {name}<br/><b>Enrollment No</b>: {enrollment_no}<br/><b>Class</b>: {_class}", pdf_styles["BodyText"]))
doc.title = f"{subject} | Semester {semester}"
doc.build(elements)
r2_client.upload_file(os.path.join(os.getcwd(), pdf_filename), pdf_filename, namespace_id)
pdf_link = f"https://files.praxout.co/{pdf_filename}"
os.remove(os.path.join(os.getcwd(), pdf_filename))
return jsonify({"pdf_link": pdf_link})
except (json.JSONDecodeError, KeyError) as e:
return jsonify({"error": "Invalid JSON format or missing required fields.", "success": False, "message": str(e)}), 400
Here is my form component, where I want the progress to be displayed:
export default function CreateOutPutForm() {
const [codePDFData, setCodePDFData] = useState(null);
const [loading, setLoading] = useState(false);
const [values, setValues] = useState<Partial<z.infer<typeof formSchema>>>({});
const [progress, setProgress] = useState(0);
const { toast } = useToast();
const handleSubmit = async (values: typeof formSchema._type) => {
try {
setLoading(true);
setProgress(0);
const response = await fetch(`/api/practicals/generate`, {
method: "POST",
body: JSON.stringify({ ...values }),
}).then((res) => res.json());
if (response.pdf_link) {
setCodePDFData(response.pdf_link);
toast({
title: "Practical Generated",
description: "The practical has been generated successfully.",
});
} else {
console.error("Failed to generate output.");
toast({
title: "Failed to Generate Practical",
description: `An error occurred while generating the practical...`,
});
}
} catch (error) {
console.error("Error:", error);
toast({
title: "Failed to Generate Practical",
description: `An error occurred while generating the practical: ${error.message}`,
});
} finally {
setLoading(false);
}
};
return (
<div className="flex w-full flex-col items-center justify-center gap-8 pb-24 ">
{JSON.stringify(values.type)}
<AutoForm
formSchema={formSchema}
values={values}
onParsedValuesChange={setValues}
onSubmit={(values) => handleSubmit(values)}
className="mx-auto max-w-xl"
>
<AutoFormSubmit disabled={loading} className="w-full">
Generate Practical
</AutoFormSubmit>
</AutoForm>
{codePDFData && !loading && (
<PDFVIewer
key={codePDFData}
className="min-h-24 w-full"
link={codePDFData}
/>
)}
{loading && (
<div className="w-full">
<Progress value={progress} />
</div>
)}
</div>
);
}
Frontend API route.ts (/api/practicals/generate)
import { NextRequest, NextResponse } from "next/server";
import { neon } from "@neondatabase/serverless";
import axios from "axios";
import { io } from "socket.io-client";
const BACKEND_URI = process.env.NODE_ENV === "production" ? process.env.BACKEND_API_URI! : "http://127.0.0.1:5000";
export const socket = io(BACKEND_URI);
type GeneratePracticalOutputRequestProps = {
semester: number;
subject: string;
num: number;
name: string;
enrollment_no: string;
_class: string;
invert: boolean;
type: "single" | "all"
}
export async function POST(req: NextRequest, res: NextResponse) {
const sql = neon(process.env.DATABASE_URL!);
if (!req || !req.body) {
return NextResponse.json({ error: "Request body required.", status: 400 });
}
try {
const { semester, subject, num, name, enrollment_no, _class, invert, type }: GeneratePracticalOutputRequestProps = await req.json();
console.log(type)
if (type === "single") {
const res = await sql`SELECT DISTINCT * FROM practicals WHERE semester = ${semester} AND subject = ${subject} AND num = ${num};`;
if (res.length === 0) {
return NextResponse.json({ message: "Practical does not exist.", status: 400, success: false });
}
const data = res[0];
const code_template = data.code_template;
const code_theme = data.code_template_highlight;
const output_template = data.output_template;
const output_theme = data.output_highlight;
const language = data.code_language;
const response = await axios.post(`${BACKEND_URI}/api/generate`, {
code: code_template,
language: language,
name: name,
enrollment_no: enrollment_no,
_class: _class,
code_theme: code_theme,
output_theme: output_theme,
num: num,
semester: semester,
subject: subject,
invert: invert,
aim: data.aim,
written_work: data.written_work,
generate_pdf: true,
output: output_template,
}, {
headers: {
"Authorization": process.env.BACKEND_API_KEY!,
},
});
return NextResponse.json({ pdf_link: response.data.pdf_link, status: 200, success: true });
}
if (type === "all") {
const res = await sql`SELECT * FROM practicals WHERE semester = ${semester} AND subject = ${subject} ORDER BY num ASC;`;
if (res.length === 0) {
return NextResponse.json({ message: "No practicals found.", status: 400, success: false });
}
const response = await axios.post(`${BACKEND_URI}/api/generate/all`, {
practicals: res,
semester: semester,
subject: subject,
name: name,
enrollment_no: enrollment_no,
_class: _class,
invert: invert,
}, {
headers: {
"Authorization": process.env.BACKEND_API_KEY!,
},
});
return NextResponse.json({ pdf_link: response.data.pdf_link, status: 200, success: true });
}
} catch (error) {
console.error(error);
if (error.response) {
return NextResponse.json({ error: error.response.data }, { status: error.response.status });
} else if (error.request) {
return NextResponse.json({ error: "No response received from the server." }, { status: 500 });
} else {
return NextResponse.json({ error: error.message }, { status: 500 });
}
}
}
Methods I have already tried:
- Flask SSE
- SocketIO
- Webhooks
None of the above solutions worked. Help will be appreciated 🙂
Please feel free to ask any questions regarding the code or any other related queries.