I have an ADF pipeline where I have an API call where it returns data in chunks (by 1000 max). This pipeline works if you only need 1000 users. I have been trying to figure a way to call the API again (and increment the offset by 1000) until the response is empty.
NOTE: In one API call we only receive the max of 1000 records.
API URL: https://api.<application_name>.com/api/v1/search/users
Method: POST
Body:
`query={
"terms":[
{"type":"parent_account","terms":[
{"type":"string","term":"status_group","in_list":["paying"]}
]
}
],
"count":1000,
"offset":0,
"fields":[
{"type":"date","term":"last_activity_time","field_display_name":"Last activity","desc":true},{"type":"named_aggregation","aggregation":"total_activities","duration":14,"field_display_name":"Activities (14d)"}
],
"scope":"all"
}`
I need to incrementally increase the offset by 1000 in Body each time to retrieve the next 1000 records. However, considering there are more than 200,000 records, I’ll need to continue increasing the offset by 1000 for every API call until I reach 200,000 to fetch all the records.
One solution (shown above in screenshots) I was trying is with the help of until activity, but it only run for the first 1000 records and not running for the other records.
I am looking for a single file at the blob storage location in Json format with all the records in it (covering all the offset until the API respond with 0 records).
Any suggestions on how to approach and what activities that needed to resolve this would be appreciated.
To incrementally increase the offset by 1000 in Body query each time to retrieve the next 1000 records, iterating is possible either from that pipeline query or uploading a python script and execute it for the data:
Change the Variable offset to a range, in value: @range(0,1001), then another offset variable value @range(1001,2001), then another one, the last number indicates the last integer in the sequence or just simplify by adding a batch size property “int” type and second “int” for the maximum records:
"body": {"terms": [{"type": "parent_account","terms": [{"type": "string","term": "status_group","in_list": ["paying"]}]}],
"count": 1000,
"offset": "@variables('offset')",
"fields": [{"type": "date","term": "last_activity_time","field_display_name": "Last activity","desc": true},
{"type": "named_aggregation","aggregation": "total_activities","duration": 14,"field_display_name": "Activities 14 days"}],"scope": "all"}}},
{"name": "IncrementOffset","type": "SetVariable","typeProperties": {"variableName": "offset","value": "@add(variables('offset'), variables('batchSize'))"
The variables are those already defined or should look like:
"variables": {
"offset": {
"type": "Int",
"defaultValue": 0
},
"batchSize": {
"type": "Int",
"defaultValue": 1000
},
"maxRecords": {
"type": "Int",
"defaultValue": 200000
}
The python script is different but equally efficient, you can upload it in the pipeline a batch service (found here https://learn.microsoft.com/en-us/azure/batch/tutorial-run-python-batch-azure-data-factory#create-and-run-the-pipeline):
If it is a blob file this will help, if it is another type of data, this has to be modified:
#Needs authentication if outside ADF online
import requests
import json
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient #pip install azure-storage-blob
#authenticate
connection_string = "<storage-account-connection-string>"
container_name = "your_container_name_here"
#connect to your blob or data record
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
#your connection endpoint of the ADF resource deployed
url = "https://...blob....storageurl"
headers = {"Content-Type": "application/json"}
batch_size = 1000
max_records = 200000
#check data through API
def fetch_data(offset):
payload = {"terms": [{"type": "parent_account","terms": [{
"type": "string",
"term": "status_group",
"in_list": ["paying"]}]}],
"count": batch_size,"offset": offset,
fields: [{"type": "date","term": "last_activity_time","field_display_name": "Last activity","desc": True},
{"type": "named_aggregation","aggregation": "total_activities","duration": 14,"field_display_name": "Activities (14d)"}],"scope": "all"}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.json()
#check the batch each until we reach max_records
offset = 0
while offset < max_records:
data = fetch_data(offset)
blob_name = f"data_batch_{offset}.json"
blob_client = container_client.get_blob_client(blob_name)
blob_client.upload_blob(json.dumps(data), overwrite=True)
offset += batch_size
Download or preview the result
jbsidis is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.