i am trying to compress the gif file but getting the following error. it says not able to find the file when trying to upload it to s3.
MoviePy – Building file /tmp/download_compressed.gif
MoviePy – – Generating GIF frames.
MoviePy – – Optimizing GIF with ImageMagick.
MoviePy – – File ready: /tmp/download_compressed.gif.
File does not exist after retries: /tmp/download_compressed.gif
Error compressing download.gif: Compressed file not found: /tmp/download_compressed.gif
below is my django view that handles the conversion:
import os
import re
import time
import tempfile
import boto3
from django.conf import settings
from django.core.files.base import ContentFile
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from moviepy.editor import VideoFileClip
import logging
logger = logging.getLogger(__name__)
class CompressGifView(APIView):
def post(self, request, *args, **kwargs):
logger.debug("Received GIF compression request")
files = request.FILES.getlist('files')
s3 = boto3.client('s3')
bucket_name = settings.AWS_STORAGE_BUCKET_NAME
folder = 'compressed_gifs/'
download_links = []
success = []
converted_file_paths = []
channel_layer = get_channel_layer()
def sanitize_group_name(name):
sanitized_name = re.sub(r'[^a-zA-Z0-9-_.]', '_', name)
sanitized_name = sanitized_name[:99]
return sanitized_name
def check_file_existence(file_path):
retries = 10
while retries > 0:
if os.path.isfile(file_path):
logger.debug(f"File exists: {file_path}")
return True
time.sleep(1)
retries -= 1
logger.error(f"File does not exist after retries: {file_path}")
return False
def compress_gif(file):
unique_channel = sanitize_group_name(f"file_status_group_{file.name}")
try:
async_to_sync(channel_layer.group_add)(unique_channel, unique_channel)
async_to_sync(channel_layer.group_send)(
unique_channel,
{'type': 'send_status', 'status': 'Getting Ready'}
)
logger.debug(f"Compressing GIF: {file.name}")
temp_input_path = os.path.join(tempfile.gettempdir(), file.name)
temp_output_path = os.path.join(tempfile.gettempdir(), f"{file.name.rsplit('.', 1)[0]}_compressed.gif")
# Verify temporary directory
temp_dir = tempfile.gettempdir()
if not os.path.exists(temp_dir):
raise FileNotFoundError(f"Temporary directory does not exist: {temp_dir}")
with open(temp_input_path, 'wb') as temp_input_file:
temp_input_file.write(file.read())
logger.debug(f"Temporary input path: {temp_input_path}")
logger.debug(f"Temporary output path: {temp_output_path}")
# Compress GIF using MoviePy
clip = VideoFileClip(temp_input_path)
resized_clip = clip.resize(newsize=(clip.w // 2, clip.h // 2))
resized_clip.set_fps(10).write_gif(temp_output_path, fps=10, program='ImageMagick')
# Check file existence with retry mechanism
if not check_file_existence(temp_output_path):
raise FileNotFoundError(f"Compressed file not found: {temp_output_path}")
file_size = os.path.getsize(temp_output_path)
logger.debug(f"Compressed file size: {file_size} bytes")
# Upload to S3
with open(temp_output_path, 'rb') as compressed_file:
content_file = ContentFile(compressed_file.read(), name=os.path.basename(temp_output_path))
logger.debug(f"Uploading compressed GIF: {os.path.basename(temp_output_path)}")
s3.upload_fileobj(
content_file,
bucket_name,
folder + os.path.basename(temp_output_path),
ExtraArgs={'ContentType': 'image/gif'}
)
download_link = f"https://{bucket_name}.s3.amazonaws.com/{folder}{os.path.basename(temp_output_path)}"
logger.debug(f"Generated download link for compressed GIF: {download_link}")
download_links.append({
"url": download_link,
"original_name": file.name,
"converted_name": os.path.basename(temp_output_path),
"format": 'gif',
})
success.append({
"original_name": file.name,
"format": 'gif',
})
converted_file_paths.append(temp_output_path)
except Exception as e:
logger.error(f"Error compressing {file.name}: {e}")
return {
"error": f"Error compressing {file.name}: {e}",
"original_name": file.name,
"format": 'gif',
}
return None
# Use ThreadPoolExecutor for concurrent processing
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
futures = [executor.submit(compress_gif, file) for file in files]
for future in futures:
result = future.result()
if result:
return Response(result, status=status.HTTP_400_BAD_REQUEST)
# Clean up temporary files
for file_path in converted_file_paths:
if os.path.exists(file_path):
os.remove(file_path)
return Response({'message': 'Files processed successfully', 'download_links': download_links, 'success': success}, status=status.HTTP_200_OK)
test testing is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.