I’m working with an API (Gemini 1.5 Pro) that has a rate limit of 1,000 calls per minute per API key (at least it says that). I need to process 14,000 entries from an Excel file as fast as possible.
To handle the rate limits, I:
Use multiple API keys (5 keys stored in a TXT file).
Distribute calls evenly across the keys, ensuring no key exceeds the 1,000 calls/minute limit.
Add a time.sleep(0.1) delay between calls per key to stay within the rate limit.
However, even with just one API key and properly adjusted time.sleep, I still encounter 429: Resource has been exhausted errors.
Why would I hit rate limits even with these precautions? Is there an undocumented rate limit, or am I missing something in how rate limits are calculated?
Any advice or debugging suggestions would be appreciated.
import os
import pandas as pd
import google.generativeai as genai
import threading
import queue
import time
import math
# --------------------------------------------------------------------------
# KONFIGURATION
# --------------------------------------------------------------------------
# Pfad zu den API-Keys
api_keys_path = os.path.join(os.path.dirname(__file__), '..', 'api_keys_multiplex.txt')
with open(api_keys_path, 'r') as file:
api_keys = [k.strip() for k in file if k.strip()]
if not api_keys:
raise ValueError("Keine API-Keys in api_keys_multiplex.txt gefunden.")
# Ziel: ~1000 Requests/Minute => ca. 16.67 Requests/Sek => alle ~0.06s ein Token
TOKENS_PER_MINUTE = 1000
TOKEN_INTERVAL = 60.0 / TOKENS_PER_MINUTE # ~0.06 Sek pro Token
# Anzahl Worker-Threads pro Key (je mehr, desto besser wird die Wartezeit überbrückt)
WORKERS_PER_KEY = 5
# System Prompt für Gemini
system_instructions = """
You are an AI assistant tasked with analyzing Bombora Intent Topics and determining whether they are relevant to the business focus of the company described in the user prompt.
### Input:
You will receive the following columns for each topic:
- **Theme**: The overarching theme (e.g., BioTech, Software).
- **Category**: The subcategory under the theme.
- **Topic Name**: The specific name of the topic.
- **Description**: A description of the topic's meaning.
### Task:
Your task is to:
1. Analyze the topic based on the information provided in the columns.
2. Strictly determine whether the topic aligns with the company's purpose as described in the user prompt.
### Notes:
- Be very strict. Only mark topics as relevant if they have a direct and clear connection to the company's purpose.
- Respond with "Yes" or "No" only, without any explanations.
"""
generation_config = {
"temperature": 0.3,
"top_p": 0.9,
"top_k": 40,
"max_output_tokens": 5,
"response_mime_type": "text/plain",
}
current_dir = os.path.dirname(__file__)
excel_files = [f for f in os.listdir(current_dir) if f.endswith('.xlsx')]
if len(excel_files) != 1:
raise ValueError("Es muss genau eine Excel-Datei im aktuellen Verzeichnis vorhanden sein!")
file_name = excel_files[0]
output_file_name = "filtered_output.xlsx"
theme_col = "Theme"
category_col = "Category"
topic_name_col = "Topic Name"
description_col = "Description"
data_df = pd.read_excel(os.path.join(current_dir, file_name))
# Queue mit allen Zeilen
task_queue = queue.Queue()
for index, row in data_df.iterrows():
theme = row[theme_col] if pd.notna(row[theme_col]) else ""
category = row[category_col] if pd.notna(row[category_col]) else ""
topic_name = row[topic_name_col] if pd.notna(row[topic_name_col]) else ""
description = row[description_col] if pd.notna(row[description_col]) else ""
task_queue.put((index, theme, category, topic_name, description))
results = {}
results_lock = threading.Lock()
# Token Bucket Mechanismus pro Key
class TokenBucket:
def __init__(self, tokens_per_minute=TOKENS_PER_MINUTE):
self.rate = tokens_per_minute / 60.0 # tokens per second
self.capacity = tokens_per_minute
self.tokens = tokens_per_minute
self.last_refill = time.time()
self.lock = threading.Lock()
def get_token(self):
# Versuche ein Token zu bekommen, blockiere wenn none verfügbar
while True:
with self.lock:
now = time.time()
# Tokens nachfüllen
elapsed = now - self.last_refill
# Anzahl tokens, die man seit letztem refill hinzufügen kann
new_tokens = elapsed * self.rate
if new_tokens > 0:
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return
time.sleep(0.1) # kurz warten und erneut versuchen
def worker(api_key, task_queue, results, results_lock, token_bucket):
# Jede Worker-Thread hat seine eigene Session mit diesem Key
genai.configure(api_key=api_key)
model = genai.GenerativeModel(
model_name="gemini-1.5-pro-002",
system_instruction=system_instructions,
generation_config=generation_config,
)
chat_session = model.start_chat()
while True:
try:
index, theme, category, topic_name, description = task_queue.get_nowait()
except queue.Empty:
break
user_prompt = f"""
We are a company focused on process optimization using digital solutions and artificial intelligence. Our goal is to streamline business operations and improve efficiency through advanced technologies.
Topic Details:
- **Theme**: {theme}
- **Category**: {category}
- **Topic Name**: {topic_name}
- **Description**: {description}
Does this topic directly align with our company's purpose?
"""
try:
# Bevor wir den Request senden, holen wir uns ein Token
token_bucket.get_token()
response = chat_session.send_message(user_prompt)
relevance = response.text.strip()
with results_lock:
results[index] = relevance
print(f"Zeile {index} klassifiziert: {relevance}")
except Exception as e:
with results_lock:
results[index] = f"Error: {e}"
print(f"Zeile {index} Fehler: {e}")
finally:
task_queue.task_done()
# Für jeden Key einen eigenen TokenBucket erstellen
token_buckets = [TokenBucket(tokens_per_minute=TOKENS_PER_MINUTE) for _ in api_keys]
threads = []
# Für jeden Schlüssel mehrere Worker starten
for key, bucket in zip(api_keys, token_buckets):
for _ in range(WORKERS_PER_KEY):
t = threading.Thread(target=worker, args=(key, task_queue, results, results_lock, bucket))
t.start()
threads.append(t)
# Warten bis alles abgearbeitet ist
for t in threads:
t.join()
relevances = [results[i] for i in range(len(data_df))]
data_df["Relevance"] = relevances
filtered_df = data_df[data_df["Relevance"] == "Yes"]
filtered_df.to_excel(os.path.join(current_dir, output_file_name), index=False)
print(f"Prozess abgeschlossen. Gefilterte Datei gespeichert: {output_file_name}")