I’m trying to send logs from a Python function in Azure into Datadog using their API via the V2 library. Some logs are getting lost and I can’t figure out why.
Here is my handler (implements logging.handler
)
import atexit
import logging
import queue
import threading
import time
import random
from datadog_api_client.exceptions import ApiException
from datadog_api_client.v2 import ApiClient, Configuration
from datadog_api_client.v2.api import logs_api
from datadog_api_client.v2.models import HTTPLog, HTTPLogItem
from ts.common.logging.utils import generate_tags
from ts.domain.objects import Environment
class DatadogCustomLogHandler(logging.Handler):
def __init__(
self,
service_name="default",
host_name: str = None,
environment: Environment = None,
batch_size: int = 50,
flush_interval: int = 5,
**kwargs,
):
super(DatadogCustomLogHandler, self).__init__()
self.service_name = service_name
self.host_name = host_name
self.environment = environment
self.batch_size = batch_size
self.flush_interval = flush_interval
self.logs_queue = queue.Queue()
self.is_shutting_down = threading.Event()
self.logs_batch_lock = threading.Lock()
self.thread = threading.Thread(target=self._worker)
self.thread.daemon = True
self.thread.start()
atexit.register(self.cleanup)
def emit(self, record):
# Prevent blocking the application; drop the log if shutting down.
if not self.is_shutting_down.is_set():
try:
self.logs_queue.put_nowait(self.format_record(record))
except queue.Full:
pass # Optionally, handle or log the drop.
def format_record(self, record):
# Convert record to dict or desired format for Datadog
log_data = record.__dict__.get("msg", {})
if isinstance(log_data, str):
message = log_data
properties = {}
else:
message = log_data.pop("event", "")
properties = log_data
log_entry = {
"message": message,
"properties": properties,
"level": record.levelname,
}
return log_entry
def _submit_logs(self, logs_batch):
max_retries = 3 # Maximum number of retries
backoff_factor = 2 # Backoff multiplier
initial_delay = 0.5 # Initial delay in seconds
with ApiClient(Configuration()) as api_client:
api_instance = logs_api.LogsApi(api_client)
log_items = []
tags = generate_tags(
service_name=self.service_name,
host_name=self.host_name,
environment=self.environment,
)
for log in logs_batch:
log_item = HTTPLogItem(
**log["properties"],
ddsource="python",
message=log["message"],
service=self.service_name,
ddtags=tags,
host=self.host_name,
)
log_items.append(log_item)
body = HTTPLog(log_items)
for attempt in range(max_retries + 1):
try:
api_instance.submit_log(body)
break # Break the loop if the submission was successful
except ApiException as e:
if attempt < max_retries:
sleep_time = (backoff_factor**attempt) * initial_delay
# Adding jitter to prevent Thundering Herd problem
sleep_time += random.uniform(0, 0.2)
print(
f"Retry #{attempt + 1}: An error occurred while submitting logs to Datadog: {e}. "
f"Retrying in {sleep_time} seconds."
)
time.sleep(sleep_time)
else:
print(
f"Failed to submit logs to Datadog after {max_retries} attempts: {e}"
)
def _worker(self):
while not self.is_shutting_down.is_set() or not self.logs_queue.empty():
current_batch = []
while not self.logs_queue.empty() and len(current_batch) < self.batch_size:
try:
log = self.logs_queue.get_nowait()
current_batch.append(log)
except queue.Empty:
break
if current_batch:
with self.logs_batch_lock:
self._submit_logs(current_batch)
if not self.is_shutting_down.is_set():
time.sleep(self.flush_interval)
def _shutdown_worker(self):
self.is_shutting_down.set()
self.thread.join()
def cleanup(self):
self._shutdown_worker()
def close(self):
self.cleanup()
super(DatadogCustomLogHandler, self).close()
And this is the function I use to instantiate my logger, which calls DatadogCustomHandler
:
import logging
import socket
import structlog
from ts.common.helpers.validation import getenv_with_warning, validate_var_type
from ts.common.logging.handler import DatadogCustomLogHandler
from ts.domain.objects import Environment
host_name = socket.gethostname()
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.UnicodeDecoder(),
structlog.processors.EventRenamer("message"),
structlog.processors.JSONRenderer(),
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
)
def init_datadog_logger(
service_name: str, min_log_level: int = logging.INFO, logger_name: str = None
) -> structlog.stdlib.BoundLogger:
env = getenv_with_warning("ENV")
env_service_name = getenv_with_warning("SERVICE_NAME")
dd_log_batch_size = int(getenv_with_warning("DD_LOG_BATCH_SIZE", 50))
validate_var_type("service_name", service_name, str, is_optional=False)
environment = next(
(member for member in Environment.__members__.values() if member.value == env),
Environment.LOCAL,
)
logger_name = logger_name or env_service_name or "default_logger"
stdlib_logger = logging.getLogger(logger_name)
logger = structlog.wrap_logger(stdlib_logger)
if stdlib_logger.handlers:
print(
f"Logger {logger_name if logger_name else 'root'} already initialized, skipping initialization."
)
return logger
stdlib_logger.setLevel(min_log_level)
if environment:
datadog_custom_handler = DatadogCustomLogHandler(
service_name=service_name,
host_name=host_name,
environment=environment,
level=min_log_level,
batch_size=dd_log_batch_size,
)
stdlib_logger.addHandler(datadog_custom_handler)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
file_handler = logging.FileHandler("logfile.log")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
stdlib_logger.addHandler(file_handler)
stdlib_logger.propagate = False
return logger
For instance I would create a timed function in Azure that outputs 2500 arbitrary logs every 10 minutes. I would instantiate the log this way:
logger = init_datadog_logger(service_name=os.getenv("SERVICE_NAME"), logger_name=os.getenv("SERVICE_NAME"))
However from those 2500 logs, maybe only 2100-2200 make it in at any one point. It’s the same when I run it locally, but maybe not the same loss rate. How do I check for what is going on, and how to improve it?