I have a big number (millions) of documents in an index.
I’m trying to fetch them with logstash, call an API to enrich the docs and reindex. The problem is when something occurs and logstash shuts down. I don’t want to start the process from the beginning.
For that matter I saw in the logstash documentation that I can use point in time with search_after in order to resume the process.
This is the logstash config file:
input {
elasticsearch {
type => "standard_index"
hosts => [ "http://localhost:9200/" ]
query => '{
"size": 10000,
"sort": [
{"last_updated": "asc"}
],
"pit": {
"id": "uMyMBAEJdGVzdF9kYXRhFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAFkFzbUpDR2R4Uy1XT05uWnVHYjZYOVEAAAAAAAAAFcgWT0FwS2x3RWhUbi12a1U0aWtvckVLUQABFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAAA==",
"keep_alive": "5m"
}
}'
ssl_verification_mode => "none"
user => "elastic"
password => "123qweASD."
index => "test_data"
#schedule => "*/20 * * * *"
search_api => "search_after"
docinfo => true
docinfo_target => "[@metadata][doc]"
docinfo_fields => ["_index", "_type", "_id", "_source"]
}
}
filter {
mutate {
add_field => { "all_data" => "%{[@metadata][doc]}"}
}
if [type] == "standard_index" {
http {
url => "http://localhost:5095/Ltp/ApplyLTP"
verb => POST
target_body => api_response_test
}
}
if [type] == "exception_index" {
http {
url => "http://localhost:5095/Ltp/HandleLTPException"
verb => POST
query => { "id" => "%{[@metadata][doc][_source]}" }
target_body => api_response
}
}
}
output {
elasticsearch {
hosts => [ "http://localhost:9200" ]
ssl_verification_mode => "none"
user => "elastic"
password => "123qweASD."
index => "test_index_multiple_inputs"
}
stdout {
codec => dots
}
If I mention the PIT parameter in the query, logstash shuts down staying that it has a duplicated parameter PIT.
[2024-06-08T20:36:18,405][ERROR][logstash.inputs.elasticsearch.searchafter][main][dda1999ccd9a63f515e4483e9dbccd8ab077af231b1eef6540b0495b4ab7228b] Tried search_after paginated search unsuccessfully {:message=>"[400] {"error":{"root_cause":[{"type":"x_content_parse_exception","reason":"[1:250] Duplicate field 'pit'\n at [Source: (byte[])\"{\"size\":10000,\"sort\":[{\"last_updated\":\"asc\"}],\"pit\":{\"id\":\"uMyMBAEJdGVzdF9kYXRhFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAFkFzbUpDR2R4Uy1XT05uWnVHYjZYOVEAAAAAAAAAFcgWT0FwS2x3RWhUbi12a1U0aWtvckVLUQABFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAAA==\",\"keep_alive\":\"5m\"},\"pit\":{\"id\":\"uMyMBAEJdGVzdF9kYXRhFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAFkFzbUpDR2R4Uy1XT05uWnVHYjZYOVEAAAAAAAAAFkQWT0FwS2x3RWhUbi12a1U0aWtvckVLUQABFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAAA==\",\"keep_alive\":\"1m\"}}\"; line: 1, column: 250]"}],"type":"x_content_parse_exception","reason":"[1:250] Duplicate field 'pit'\n at [Source: (byte[])\"{\"size\":10000,\"sort\":[{\"last_updated\":\"asc\"}],\"pit\":{\"id\":\"uMyMBAEJdGVzdF9kYXRhFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAFkFzbUpDR2R4Uy1XT05uWnVHYjZYOVEAAAAAAAAAFcgWT0FwS2x3RWhUbi12a1U0aWtvckVLUQABFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAAA==\",\"keep_alive\":\"5m\"},\"pit\":{\"id\":\"uMyMBAEJdGVzdF9kYXRhFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAFkFzbUpDR2R4Uy1XT05uWnVHYjZYOVEAAAAAAAAAFkQWT0FwS2x3RWhUbi12a1U0aWtvckVLUQABFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAAA==\",\"keep_alive\":\"1m\"}}\"; line: 1, column: 250]","caused_by":{"type":"json_parse_exception","reason":"Duplicate field 'pit'\n at [Source: (byte[])\"{\"size\":10000,\"sort\":[{\"last_updated\":\"asc\"}],\"pit\":{\"id\":\"uMyMBAEJdGVzdF9kYXRhFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAFkFzbUpDR2R4Uy1XT05uWnVHYjZYOVEAAAAAAAAAFcgWT0FwS2x3RWhUbi12a1U0aWtvckVLUQABFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAAA==\",\"keep_alive\":\"5m\"},\"pit\":{\"id\":\"uMyMBAEJdGVzdF9kYXRhFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAFkFzbUpDR2R4Uy1XT05uWnVHYjZYOVEAAAAAAAAAFkQWT0FwS2x3RWhUbi12a1U0aWtvckVLUQABFlNXQUpCaWgwUnZlb3RxMUU2WTdMM0EAAA==\",\"keep_alive\":\"1m\"}}\"; line: 1, column: 250]"}},"status":400}", :cause=>nil}
I also saw that logstash creates by itself a PIT and closes it after the pagination is over.
[2024-06-08T20:36:18,345][INFO ][logstash.inputs.elasticsearch.searchafter][main][dda1999ccd9a63f515e4483e9dbccd8ab077af231b1eef6540b0495b4ab7228b] Create point in time (PIT)
How can I use the PIT with logstash in order to create the resilience mechanism?