Background:
My function extracts data from a CSV file and generates JSON using two proto files. I need to add another three fields that are not in the proto files and generate a final json . Later, this JSON is serialized and sent to RabbitMQ.
My code:
import argparse
import logging
from python import model_pb2,events_pb2
from google.protobuf.timestamp_pb2 import Timestamp
import time
import decimal
import csv
#from proto_definitions import create_answer_type
from pathlib import Path
from google.protobuf.json_format import MessageToDict, MessageToJson
#from python.model_pb2 import AnswerUpdateRequest
from google.protobuf import timestamp_pb2
from google.protobuf import wrappers_pb2 as wrappers
from google.protobuf import json_format
#from proto import model
import pandas as pd
import boto3
import dotenv
import os
import json
import chardet
from datetime import date
from document_parser.pdf import PdfParser
from document_parser.params import Paths,Enviorment,rabbitmq
from document_parser.utils import Timer
from document_parser.question_extractor import QuestionExtractor, load_questions, Question, ValidTypes
import pika
import json
import base64
from document_parser.uploads_downloads import *
from document_parser.status_file import *
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.json_format import MessageToJson, MessageToDict
from python.events_pb2 import AnswerUpdateRequest
from python.model_pb2 import AnswerSource,Answer,AnswerFieldType,Entity
from google.protobuf.json_format import Parse
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.json_format import MessageToJson, MessageToDict
from python.events_pb2 import AnswerUpdateRequest
from python.model_pb2 import AnswerSource,Answer,AnswerFieldType,Entity
def generate_final_json(json_decoded_data, document_type, file_generated_time, parsed_pdf_file_path):
csv_path = Paths.llm_prediction_dir.joinpath(f"{parsed_pdf_file_path.parent.name}_extracted.csv")
print("csv_path is " + str(csv_path))
answer_update_request = AnswerUpdateRequest()
answer_update_request.entity.type = Entity.ORGANIZATION
answer_update_request.entity.id = json_decoded_data['orgId']
with open(csv_path, 'r') as csv_file:
csv_reader = csv.DictReader(csv_file)
for row in csv_reader:
if row["answer"] != "No answer":
# answer_type = create_answer_type(row["answer"])
answer = model_pb2.Answer()
answer.key = row["api_field_name"]
answer.source.type = model_pb2.AnswerSource.DOCUMENT
answer.source.id = f"{json_decoded_data['documentKey']} | {document_type}"
# provided_at_datetime = datetime.fromisoformat(json_decoded_data['provided_at'])
# provided_at_timestamp = Timestamp()
# provided_at_timestamp.FromDatetime(provided_at_datetime)
# answer.provided_at.CopyFrom(provided_at_timestamp)
provided_at = Timestamp()
provided_at_datetime = datetime.fromisoformat(file_generated_time)
answer.provided_at.CopyFrom(provided_at)
received_at = Timestamp()
received_at.FromDatetime(datetime.now())
answer.received_at.CopyFrom(received_at)
# answer.type = answer_type
answer.value.text = row["answer"]
answer_update_request.answers.append(answer)
final_json={}
# Convert the AnswerUpdateRequest to a JSON string
answer_update_request_json = json_format.MessageToJson(answer_update_request, including_default_value_fields=True)
answer_update_request_dict = json.loads(answer_update_request_json)
# Create the final JSON object
final_json = {
"answerUpdateRequest": answer_update_request_dict,
"documentKey": json_decoded_data['documentKey'],
"applicationId": json_decoded_data["applicationId"],
"activityId": json_decoded_data["activityId"]
}
# Write the final JSON to a file
final_json_generated = json.dumps(final_json, indent=2)
output_json_path = Paths.llm_prediction_dir.joinpath(f"{parsed_pdf_file_path.parent.name}_with_labels.json")
with open(output_json_path, 'w') as json_file:
json_file.write(final_json_generated)
print(f"Protobuf message written to {output_json_path}")
return // send serilaized message
as you can see below 3 keys are not there in proto files so I am manually adding them
“documentKey”: json_decoded_data[‘documentKey’],
“applicationId”: json_decoded_data[“applicationId”],
“activityId”: json_decoded_data[“activityId”]
model.proto:
syntax = "proto3";
package com.embroker.oracle;
import "google/protobuf/timestamp.proto";
option go_package = "github.com/embroker/oracle/sdk/go_sdk";
message Entity {
Type type = 1;
string id = 2;
enum Type {
ORGANIZATION = 0;
USER = 1;
APPLICATION = 2;
}
}
message AnswerSource {
Type type = 1;
string id = 2;
enum Type {
UNKNOWN = 0;
USER = 1;
DOCUMENT = 2;
EXTERNAL = 3;
}
}
message Answer {
string key = 1;
AnswerSource source = 2;
google.protobuf.Timestamp provided_at = 3;
google.protobuf.Timestamp received_at = 4;
AnswerFieldType type = 5;
Value value = 6;
message Value {
oneof value {
string text = 1;
float decimal = 2;
// ...
}
}
}
enum AnswerFieldType {
ANSWER_FIELD_TYPE_UNSTRUCTURED = 0; // Can be useful for LLM purposes
ANSWER_FIELD_TYPE_TEXT = 1;
ANSWER_FIELD_TYPE_INTEGER = 2;
ANSWER_FIELD_TYPE_BOOLEAN = 3;
ANSWER_FIELD_TYPE_DECIMAL = 4;
ANSWER_FIELD_TYPE_DATE = 5;
ANSWER_FIELD_TYPE_ADDRESS = 6;
}
events.proto:
syntax = "proto3";
package com.embroker.oracle;
import "model.proto";
option go_package = "github.com/embroker/oracle/sdk/go_sdk";
// This is the inbound message intended to inform the Oracle of new answers to be persisted
message AnswerUpdateRequest {
Entity entity = 1;
repeated Answer answers = 2;
}
// This is the outbound message informing Oracle subscribers of new answers
message AnswersUpdated {
Entity entity = 1;
repeated Answer answers = 2;
}