I’m trying to set up Google Cloud Logging for my Celery tasks, but I’m having trouble getting the logs to appear in Google Cloud Logging. I’ve added the logger configuration to my script, but the logs still do not go up. Below is my setup:
Logger Configuration:
<code>import json
import os
import logging
from google.cloud import bigquery
from google.cloud import logging as cloud_logging
from google.auth import exceptions as google_auth_exceptions
# Initialize Google Cloud Logging client and setup logging
client = cloud_logging.Client()
client.setup_logging()
# Logger configuration
def setup_logger():
# Define a global logger variable
global logger
logger = logging.getLogger('cloudLogger')
# Check if the logger has handlers already
if not logger.hasHandlers():
logger.setLevel(logging.INFO)
# Create a stream handler
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
# Define a formatter without including the script name in the log message
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# Add the handler to the logger
logger.addHandler(handler)
return logger
# Example usage
if __name__ == "__main__":
logger = setup_logger()
logger.info("Logger is set up and ready to use.")
</code>
<code>import json
import os
import logging
from google.cloud import bigquery
from google.cloud import logging as cloud_logging
from google.auth import exceptions as google_auth_exceptions
# Initialize Google Cloud Logging client and setup logging
client = cloud_logging.Client()
client.setup_logging()
# Logger configuration
def setup_logger():
# Define a global logger variable
global logger
logger = logging.getLogger('cloudLogger')
# Check if the logger has handlers already
if not logger.hasHandlers():
logger.setLevel(logging.INFO)
# Create a stream handler
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
# Define a formatter without including the script name in the log message
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# Add the handler to the logger
logger.addHandler(handler)
return logger
# Example usage
if __name__ == "__main__":
logger = setup_logger()
logger.info("Logger is set up and ready to use.")
</code>
import json
import os
import logging
from google.cloud import bigquery
from google.cloud import logging as cloud_logging
from google.auth import exceptions as google_auth_exceptions
# Initialize Google Cloud Logging client and setup logging
client = cloud_logging.Client()
client.setup_logging()
# Logger configuration
def setup_logger():
# Define a global logger variable
global logger
logger = logging.getLogger('cloudLogger')
# Check if the logger has handlers already
if not logger.hasHandlers():
logger.setLevel(logging.INFO)
# Create a stream handler
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
# Define a formatter without including the script name in the log message
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# Add the handler to the logger
logger.addHandler(handler)
return logger
# Example usage
if __name__ == "__main__":
logger = setup_logger()
logger.info("Logger is set up and ready to use.")
Celery Task:
<code>import uuid
import redis
import logging
from celery import Celery
import boto3
import json
import time
from datetime import datetime
from collections import deque
from google.cloud import bigquery
from google.cloud import logging as cloud_logging
from google.auth import exceptions as google_auth_exceptions
import os
from logging_setup import setup_logger
app = Celery('upload_tasks')
app.config_from_object('celeryconfig')
config = load_config()
logger = setup_logger()
def upload_to_s3(file_path, s3_bucket, s3_key, logger):
logger.info(f"Initializing S3 client for bucket {s3_bucket}...")
s3_client = boto3.client('s3', region_name=S3_REGION,
aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET_KEY)
try:
logger.info(f"Uploading {file_path} to s3://{s3_bucket}/{s3_key}...")
s3_client.upload_file(file_path, s3_bucket, s3_key)
logger.info(f"Successfully uploaded {file_path} to s3://{s3_bucket}/{s3_key}")
return f"s3://{s3_bucket}/{s3_key}"
except Exception as e:
logger.error(f"Failed to upload to S3: {e}")
return None
@app.task(bind=True, max_retries=50, queue='upload_files')
def upload_file(self, file_path, s3_bucket, s3_key, data, s3_location):
# print("")
logger = setup_logger()
logger.info(f"Received task to upload {file_path} to S3 bucket {s3_bucket} with key {s3_key}.")
try:
s3_location = upload_to_s3(file_path, s3_bucket, s3_key, logger)
if s3_location:
store_metadata_in_bigquery(data, s3_location)
os.remove(file_path)
logger.info(f"Task completed for uploading {file_path} to s3://{s3_bucket}/{s3_key}")
except Exception as exc:
countdown = 5 * (2 ** self.request.retries) # Exponential backoff
logger.error(f"Error occurred during upload: {exc}. Retrying in {countdown} seconds...")
raise self.retry(exc=exc, countdown=countdown)
</code>
<code>import uuid
import redis
import logging
from celery import Celery
import boto3
import json
import time
from datetime import datetime
from collections import deque
from google.cloud import bigquery
from google.cloud import logging as cloud_logging
from google.auth import exceptions as google_auth_exceptions
import os
from logging_setup import setup_logger
app = Celery('upload_tasks')
app.config_from_object('celeryconfig')
config = load_config()
logger = setup_logger()
def upload_to_s3(file_path, s3_bucket, s3_key, logger):
logger.info(f"Initializing S3 client for bucket {s3_bucket}...")
s3_client = boto3.client('s3', region_name=S3_REGION,
aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET_KEY)
try:
logger.info(f"Uploading {file_path} to s3://{s3_bucket}/{s3_key}...")
s3_client.upload_file(file_path, s3_bucket, s3_key)
logger.info(f"Successfully uploaded {file_path} to s3://{s3_bucket}/{s3_key}")
return f"s3://{s3_bucket}/{s3_key}"
except Exception as e:
logger.error(f"Failed to upload to S3: {e}")
return None
@app.task(bind=True, max_retries=50, queue='upload_files')
def upload_file(self, file_path, s3_bucket, s3_key, data, s3_location):
# print("")
logger = setup_logger()
logger.info(f"Received task to upload {file_path} to S3 bucket {s3_bucket} with key {s3_key}.")
try:
s3_location = upload_to_s3(file_path, s3_bucket, s3_key, logger)
if s3_location:
store_metadata_in_bigquery(data, s3_location)
os.remove(file_path)
logger.info(f"Task completed for uploading {file_path} to s3://{s3_bucket}/{s3_key}")
except Exception as exc:
countdown = 5 * (2 ** self.request.retries) # Exponential backoff
logger.error(f"Error occurred during upload: {exc}. Retrying in {countdown} seconds...")
raise self.retry(exc=exc, countdown=countdown)
</code>
import uuid
import redis
import logging
from celery import Celery
import boto3
import json
import time
from datetime import datetime
from collections import deque
from google.cloud import bigquery
from google.cloud import logging as cloud_logging
from google.auth import exceptions as google_auth_exceptions
import os
from logging_setup import setup_logger
app = Celery('upload_tasks')
app.config_from_object('celeryconfig')
config = load_config()
logger = setup_logger()
def upload_to_s3(file_path, s3_bucket, s3_key, logger):
logger.info(f"Initializing S3 client for bucket {s3_bucket}...")
s3_client = boto3.client('s3', region_name=S3_REGION,
aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET_KEY)
try:
logger.info(f"Uploading {file_path} to s3://{s3_bucket}/{s3_key}...")
s3_client.upload_file(file_path, s3_bucket, s3_key)
logger.info(f"Successfully uploaded {file_path} to s3://{s3_bucket}/{s3_key}")
return f"s3://{s3_bucket}/{s3_key}"
except Exception as e:
logger.error(f"Failed to upload to S3: {e}")
return None
@app.task(bind=True, max_retries=50, queue='upload_files')
def upload_file(self, file_path, s3_bucket, s3_key, data, s3_location):
# print("")
logger = setup_logger()
logger.info(f"Received task to upload {file_path} to S3 bucket {s3_bucket} with key {s3_key}.")
try:
s3_location = upload_to_s3(file_path, s3_bucket, s3_key, logger)
if s3_location:
store_metadata_in_bigquery(data, s3_location)
os.remove(file_path)
logger.info(f"Task completed for uploading {file_path} to s3://{s3_bucket}/{s3_key}")
except Exception as exc:
countdown = 5 * (2 ** self.request.retries) # Exponential backoff
logger.error(f"Error occurred during upload: {exc}. Retrying in {countdown} seconds...")
raise self.retry(exc=exc, countdown=countdown)
Despite setting up the logger and including it in the Celery configuration, the logs are not showing up in the Google Cloud Logging console. I’ve verified that the service account has the necessary permissions and the GOOGLE_APPLICATION_CREDENTIALS
environment variable is set correctly.
What I’ve Tried:
- Ensured that the Google Cloud Logging API is enabled.
- Checked that the service account has the “Logging Admin” role.
- Verified that the
GOOGLE_APPLICATION_CREDENTIALS
environment variable is correctly set. - Added the logging handler in various parts of the code to capture logs.
Environment:
- Python 3.x
- Celery 4.x
- Google Cloud Logging library
What am I missing or doing wrong? Any help would be greatly appreciated!