How to Process Only Truly New Gmail Messages Using Gmail API and Pub/Sub Notifications?

I’m developing an application using the Gmail API and Google Pub/Sub to process incoming emails. My goal is to only process truly new messages and avoid reprocessing messages that have been moved between labels (e.g., moving a message back into the inbox).

Problem:
The current implementation still processes messages that are moved back into the inbox, resulting in duplicate processing. How can I ensure that only truly new messages are processed and avoid reprocessing messages that are moved back into the inbox?

Considerations:

  • The Gmail API’s historyTypes: ['messageAdded'] seems to include messages that have been moved between labels.
  • I am storing the last processed history ID and timestamp to filter messages.
  • Should I track and store message IDs to avoid reprocessing, or is there a more efficient way to handle this with the Gmail API?

Current Setup:

  • Pub/Sub Notification: I have a Pub/Sub topic that triggers my process_notification function whenever there’s a new message added to the inbox.
  • History API: I’m using the Gmail History API to fetch message history based on the history ID.

Main Question:

How can I best process only truly new incoming messages and avoid reprocessing messages if they are moved back into the inbox(or any other label)?

Environment:

  • Language: Python
  • Framework: Flask
  • Database: MongoDB
  • Gmail API

Attempted Solution:
I tried to filter messages by their internal timestamp, processing only messages with a timestamp greater than the last processed message. I’ve tried storing message ID’s but this would cause a bloated application. Here’s a simplified version of my code:

def start_watch():
    credentials = google.oauth2.credentials.Credentials(**session['credentials'])
    service = build('gmail', 'v1', credentials=credentials)
    
    request = {
        'labelIds': ['INBOX'],
        'topicName': f"projects/{create_app().config['GOOGLE_CLOUD_PROJECT_ID']}/topics/{create_app().config['PUBSUB_TOPIC']}",
        'labelFilterBehavior': 'INCLUDE',
        'historyTypes': ['messageAdded']
    }
    response = service.users().watch(userId='me', body=request).execute()

    mongo.db.users.update_one(
        {'google_id': session['google_id']},
        {'$set': {'historyId': response['historyId'], 'watch_state': True}}
    )

def process_notification(data):
    global last_processed_history_id

    payload = base64.urlsafe_b64decode(data['message']['data'])
    message_data = json.loads(payload)
    
    history_id = message_data['historyId']
    email_address = message_data['emailAddress']
    
    current_time = time.time()

    # Debounce logic
    if email_address in last_processed_history_id:
        last_id, last_time = last_processed_history_id[email_address]
        if history_id == last_id and (current_time - last_time) < debounce_interval:
            print(f"Skipping duplicate notification for history ID {history_id}")
            return

    user = mongo.db.users.find_one({'email': email_address})
    if user:
        credentials = google.oauth2.credentials.Credentials(**user['credentials'])

        try:
            # Refresh credentials if necessary
            if credentials.expired and credentials.refresh_token:
                credentials.refresh(Request())
                mongo.db.users.update_one(
                    {'email': email_address},
                    {'$set': {'credentials': credentials_to_dict(credentials)}}
                )
            elif credentials.expired and not credentials.refresh_token:
                print("No refresh token available. User needs to re-authenticate.")
                return redirect(url_for('auth.reauthenticate'))

            service = build('gmail', 'v1', credentials=credentials)

            # Check if the sender is on the block list for the user
            blocked_senders = mongo.db.block_list.find({'receiver_id': user['_id']})
            blocked_emails = [block['email'] for block in blocked_senders]
            if email_address in blocked_emails:
                results = service.users().history().list(userId='me', startHistoryId=history_id, historyTypes=['messageAdded']).execute()
                if 'history' in results:
                    for history in results['history']:
                        if 'messagesAdded' in history:
                            for message in history['messagesAdded']:
                                apply_label(service, message['message']['id'], 'Blocked - PitchSlap')
                                remove_label(service, message['message']['id'], 'INBOX')
                return

            last_history_id = user.get('historyId', '1')
            last_processed_message_id = user.get('lastProcessedMessageId', '0')
            last_processed_timestamp = user.get('lastProcessedTimestamp', 0)

            if int(history_id) <= int(last_history_id):
                print(f"History ID {history_id} has already been processed.")
                return

            results = service.users().history().list(userId='me', startHistoryId=last_history_id, historyTypes=['messageAdded']).execute()

            if 'history' in results:
                for history in results['history']:
                    if 'messagesAdded' in history:
                        for message_added in history['messagesAdded']:
                            message_id = message_added['message']['id']
                            msg = service.users().messages().get(userId='me', id=message_id).execute()
                            
                            if 'INBOX' not in msg.get('labelIds', []):
                                print(f"Message {message_id} not in INBOX.")
                                continue
                                
                            msg_internal_date = int(msg.get('internalDate', 0)) / 1000
                            
                            if msg_internal_date <= last_processed_timestamp:
                                print(f"Message {message_id} with timestamp {msg_internal_date} has already been processed.")
                                continue

                            headers = msg.get('payload', {}).get('headers', [])
                            sender = next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown')
                            receiver = next((h['value'] for h in headers if h['name'] == 'To'), 'Unknown')
                            subject = next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject')
                            snippet = msg.get('snippet', '')

                            sender_email = extract_email_address(sender)

                            email_content = f"Subject: {subject}nn{snippet}"
                            ai_response = check_sales_email_with_openai(email_content)
                            is_sales_email = ai_response["Sales Email"]
                            confidence = ai_response["Confidence"]

                            if is_sales_email and confidence >= 70:
                                apply_label(service, message_id, 'PitchSlap')
                                remove_label(service, message_id, 'INBOX')
                                pitch = save_sales_pitch(
                                    service, message_id, sender_email, receiver, subject, snippet, email_address, user['_id'],
                                    ai_response
                                )
                                send_template_email(service, sender_email, 'form_request', pitch)
                                if not pitch.get('submitted_at'):
                                    apply_label(service, message_id, 'Quarantined - PitchSlap')

                            print(f"History ID: {history_id}")
                            print(f"Email Address: {email_address}")
                            print(f"Sender: {sender_email}")
                            print(f"Receiver: {receiver}")
                            print(f"Subject: {subject}")
                            print(f"Email Content: {snippet}")
                            print(f"Sales email detected: {is_sales_email}")

                            mongo.db.users.update_one(
                                {'email': email_address},
                                {'$set': {'lastProcessedMessageId': message_id, 'lastProcessedTimestamp': msg_internal_date}}
                            )

        except googleapiclient.errors.HttpError as error:
            if error.resp.status == 404:
                print(f"Message {message_id} not found.")
                continue
            else:
                raise

            # Update the history ID in the database after processing all messages
            mongo.db.users.update_one(
                {'email': email_address},
                {'$set': {'historyId': history_id}}
            )

            # Update the last processed history ID and timestamp
            last_processed_history_id[email_address] = (history_id, current_time)

        except google.auth.exceptions.RefreshError:
            print("Failed to refresh token, please reauthenticate.")
            return redirect(url_for('auth.reauthenticate'))

**User DB Entry: **

{
  "_id": {"$oid": "6xxxxxa"},
  "google_id": "1xxxxx7",
  "email": "[email protected]",
  "credentials": {
    "token": "yxxxxxx69",
    "refresh_token": "1xxxxxxxM",
    "token_uri": "htxxxxen",
    "client_id": "22xxxxxcom",
    "client_secret": "GOxxxxxc",
    "scopes": [
      "https://www.googleapis.com/auth/userinfo.profile",
      "https://www.googleapis.com/auth/userinfo.email",
      "openid",
      "https://www.googleapis.com/auth/gmail.readonly",
      "https://www.googleapis.com/auth/gmail.modify"
    ]
  },
  "historyId": {"$numberInt": "140436"},
  "watch_state": true,
  "last_processed_timestamp": {"$numberDouble": "1721765846.9436474"},
  "lastProcessedMessageId": "190e1507ff1729a4"
}

Console.

I restarted the program. It then went through and reprocessed everything. Properly processed the first email. I then tested it and moved it into the inbox and it then started processing it in a loop.

Starting to process history for user [email protected] from history ID 139567 to 139707
Starting to process history for user [email protected] from history ID 139567 to 139671
Message 190e13de8a820465 not found.
Message 190e13de810426fb not found.
Message 190e13de8a820465 not found.
Starting to process history for user [email protected] from history ID 139567 to 139726
Message 190e13df993fbc8b not found.
Message 190e13de810426fb not found.
Message 190e13dfcdc2e0bd not found.
Message 190e13df993fbc8b not found.
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:03] "POST /pubsub HTTP/1.1" 200 -
Message 190e13dfcdc2e0bd not found.
Message 190e13dfe60bbb9b not found.
Message 190e13de8a820465 not found.
127.0.0.1 - - [23/Jul/2024 16:37:03] "POST /pubsub HTTP/1.1" 200 -
Message 190e13de810426fb not found.
Starting to process history for user [email protected] from history ID 139707 to 139808
Starting to process history for user [email protected] from history ID 139707 to 139755
Message 190e13df993fbc8b not found.
Message 190e13dfcdc2e0bd not found.
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 139726 to 140355
Message 190e13dfe60bbb9b not found.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
Message 190e13dfe60bbb9b not found.
History ID 139587 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:04] "POST /pubsub HTTP/1.1" 200 -
History ID 139638 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
History ID 139605 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
History ID 139650 has already been processed.
127.0.0.1 - - [23/Jul/2024 16:37:05] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 140355 to 140426
Template path: C:Userstorilpitchslapv5backendapp..templates
Starting to process history for user [email protected] from history ID 140355 to 140456
History ID: 140426
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 16:37:32 -0400
Email Content: you want to buy this lead list or something
Sales email detected: True
Message 190e1507ff1729a4 not in INBOX.
Starting to process history for user [email protected] from history ID 140355 to 140436
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
Message 190e1507ff1729a4 not in INBOX.
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:37:53] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 140436 to 140507
Starting to process history for user [email protected] from history ID 140436 to 140511
Template path: C:Userstorilpitchslapv5backendapp..templates
Starting to process history for user [email protected] from history ID 140436 to 140520
Message 190e150993252c08 not in INBOX.
Message 190e150993252c08 not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:38:36] "POST /pubsub HTTP/1.1" 200 -
History ID: 140507
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 20:37:53 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
Starting to process history for user [email protected] from history ID 140511 to 140540
127.0.0.1 - - [23/Jul/2024 16:38:37] "POST /pubsub HTTP/1.1" 200 -
Starting to process history for user [email protected] from history ID 140507 to 140563
Template path: C:Userstorilpitchslapv5backendapp..templates
Starting to process history for user [email protected] from history ID 140507 to 140575
Message 190e15145fe96b7c not in INBOX.
127.0.0.1 - - [23/Jul/2024 16:38:41] "POST /pubsub HTTP/1.1" 200 -
Message 190e15145fe96b7c not in INBOX.
Template path: C:Userstorilpitchslapv5backendapp..templates
History ID: 140520
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 20:38:38 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
127.0.0.1 - - [23/Jul/2024 16:38:42] "POST /pubsub HTTP/1.1" 200 -
History ID: 140540
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 20:38:38 +0000
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
Starting to process history for user [email protected] from history ID 140520 to 140595
Starting to process history for user [email protected] from history ID 140520 to 140629
127.0.0.1 - - [23/Jul/2024 16:38:42] "POST /pubsub HTTP/1.1" 200 -
Message 190e15145fe96b7c has already been processed.
Message 190e15145fe96b7c has already been processed.
Starting to process history for user [email protected] from history ID 140540 to 140646
Template path: C:Userstorilpitchslapv5backendapp..templates
Starting to process history for user [email protected] from history ID 140540 to 140658
Message 190e151573f8d86e not in INBOX.
Message 190e151573f8d86e not in INBOX.
History ID: 140575
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
Starting to process history for user [email protected] from history ID 140540 to 140677
127.0.0.1 - - [23/Jul/2024 16:38:46] "POST /pubsub HTTP/1.1" 200 -
Message 190e151573f8d86e not in INBOX.
Template path: C:Userstorilpitchslapv5backendapp..templates
Template path: C:Userstorilpitchslapv5backendapp..templates
Starting to process history for user [email protected] from history ID 140575 to 140733
Starting to process history for user [email protected] from history ID 140575 to 140715
History ID: 140595
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True
History ID: 140629
Email Address: [email protected]
Sender: [email protected]
Receiver: [email protected]
Subject: hey
Date: Tue, 23 Jul 2024 13:38:42 -0700
Email Content: This email has been identified as a sales pitch. Please fill out the pitch form here: https://8807-97-145-180-177.ngrok-free.app/[email protected]
Sales email detected: True

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật