I am using Actix server and from my Java client I am trying to push 10 threads in parallel using 1000 tps. each. The transaction is taking 4 ms except for some sudden spike shown in the logs.
There is no index write happening while searching. I loaded all my docs on startup.
Logs
Response total in logs means time taken for search.
esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 6
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:7] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 4
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:1] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 4
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:7] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 4
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:1] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 4
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:7] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 5
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:1] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 6
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:7] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 4
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:0] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 107
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:2] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 107
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:1] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 7
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:7] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 7
2024-08-06 18:01:08 [actix-rt|system:0|arbiter:0] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 8
2024-08-06 18:01:09 [actix-rt|system:0|arbiter:7] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 4
2024-08-06 18:01:09 [actix-rt|system:0|arbiter:1] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 7
2024-08-06 18:01:09 [actix-rt|system:0|arbiter:7] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 3
2024-08-06 18:01:09 [actix-rt|system:0|arbiter:0] INFO - esme::handlers::submit_sm::src/handlers/submit_sm.rs:64 - Response total : 6
A sudden spike in time from 4 to 104 ms occurred.
Below is my code:
use tantivy::schema::*;
use tantivy::{Index, IndexWriter, TantivyDocument};
use tantivy::query::QueryParser;
use tantivy::collector::TopDocs;
use std::sync::{Mutex, RwLock, Arc};
use std::collections::HashMap;
use lazy_static::lazy_static;
use std::time::{Duration, Instant};
pub struct MySearchEngine {
indexes: RwLock<HashMap<String, Arc<SingleIndex>>>,
}
pub struct SingleIndex {
index: Index,
writer: Mutex<IndexWriter>,
}
impl SingleIndex {
fn new() -> tantivy::Result<Self> {
let mut schema_builder = Schema::builder();
let _title = schema_builder.add_text_field("title", TEXT | STORED);
let _body = schema_builder.add_text_field("body", TEXT | STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let writer = index.writer(50_000_000)?;
Ok(SingleIndex {
index,
writer: Mutex::new(writer),
})
}
fn add_document(&self, title: &str, body: &str) -> tantivy::Result<Duration> {
let start = Instant::now();
let mut writer = self.writer.lock().unwrap();
let mut doc = tantivy::TantivyDocument::default();
doc.add_text(self.index.schema().get_field("title").unwrap(), title);
doc.add_text(self.index.schema().get_field("body").unwrap(), body);
writer.add_document(doc);
writer.commit()?;
let duration = start.elapsed();
Ok(duration)
}
fn add_documents(&self, documents: &Vec<(String, String)>) -> tantivy::Result<Duration> {
let start = Instant::now();
let mut writer = self.writer.lock().unwrap();
for (title, body) in documents {
let mut doc = tantivy::TantivyDocument::default();
doc.add_text(self.index.schema().get_field("title").unwrap(), title);
doc.add_text(self.index.schema().get_field("body").unwrap(), body);
writer.add_document(doc);
}
writer.commit()?;
let duration = start.elapsed();
Ok(duration)
}
fn search(&self, query_str: &str) -> tantivy::Result<(Duration, Vec<(f32, tantivy::TantivyDocument)>)> {
let start = Instant::now();
let reader = self.index.reader()?;
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&self.index, vec![
self.index.schema().get_field("body").unwrap(),
]);
let query = query_parser.parse_query(query_str)?;
let top_docs = searcher.search(&query, &TopDocs::with_limit(10))?;
let mut results = Vec::new();
for (score, doc_address) in top_docs {
let doc = searcher.doc(doc_address)?;
results.push((score, doc));
}
let duration = start.elapsed();
Ok((duration, results))
}
}
impl MySearchEngine {
fn new() -> Self {
MySearchEngine {
indexes: RwLock::new(HashMap::new()),
}
}
fn get_or_create_index(&self, key: &str) -> tantivy::Result<Arc<SingleIndex>> {
let mut indexes = self.indexes.write().unwrap();
if let Some(index) = indexes.get(key) {
return Ok(Arc::clone(index));
}
let index = SingleIndex::new()?;
let index = Arc::new(index);
indexes.insert(key.to_string(), Arc::clone(&index));
Ok(index)
}
pub fn add_document(&self, key: &str, title: &str, body: &str) -> tantivy::Result<Duration> {
let index = self.get_or_create_index(key)?;
index.add_document(title, body)
}
pub fn add_documents(&self, key: &str, documents: &Vec<(String, String)>) -> tantivy::Result<Duration> {
let index = self.get_or_create_index(key)?;
index.add_documents(documents)
}
pub fn search(&self, key: &str, query_str: &str) -> tantivy::Result<(Duration, Vec<(f32, tantivy::TantivyDocument)>)> {
let index = self.get_or_create_index(key)?;
index.search(query_str)
}
}
lazy_static! {
pub static ref SEARCH_ENGINE: Arc<MySearchEngine> = {
let engine = MySearchEngine::new();
Arc::new(engine)
};
}
use std::ptr::null;
use actix_web::{post, web, Responder, HttpResponse};
use serde::{Deserialize, Serialize};
use log::info;
use chrono::Utc;
use std::time::{Duration, Instant};
use regex::Regex;
use tantivy::schema::Value;
use crate::tant::tantivy_lib::SEARCH_ENGINE;
use crate::utils::transaction_id::generate_transaction_id;
// Define data structures
#[derive(Debug, Deserialize, Serialize)]
pub struct SubmitSm {
#[serde(default = "default_transaction_id")]
pub transaction_id: i32,
pub key: String,
#[serde(default = "default_field2")]
pub query: String,
}
#[derive(Debug, Serialize)]
pub struct ResponseSm {
pub timestamp: String, // Use String to store timestamp in ISO 8601 format
pub transaction_id: i32,
pub time_taken: u128,
pub status: String,
pub response: Vec<(f32, tantivy::TantivyDocument)>,
}
fn default_transaction_id() -> i32 {
// Generate a unique transaction ID
generate_transaction_id() // Use the function from the module
}
fn default_field2() -> String {
"default query".to_string() // default value for query field
}
// Define the handler function
#[post("/esme/http_adapter")]
pub async fn http_adapter(item: web::Json<SubmitSm>) -> impl Responder {
let start = Instant::now();
let data: SubmitSm = item.into_inner();
info!("Received POST request to /esme/http_adapter with data: {:?}", data);
// Create the response with current timestamp and the transaction_id from SubmitSm
let start1 = Instant::now();
let search_engine = SEARCH_ENGINE.clone();
let duration1 = start1.elapsed().as_millis();
let (duration, results) = search_engine.search(&data.key, &data.query).unwrap(); // Destructure the tuple
//for (score, doc) in &results {
// info!("key: {:?} = value: {:?}", doc.field_values().get(0).unwrap().value(), doc.field_values().get(1).unwrap().value().as_str().unwrap());
// }
// info!("Received POST results to : {:?}", results);
let response = ResponseSm {
timestamp: Utc::now().to_rfc3339(), // Get current UTC timestamp in RFC 3339 format
transaction_id: data.transaction_id, // Include the transaction ID
status: "Success".to_string(),
time_taken: start.elapsed().as_millis(),
response: results, // Directly use results without cloning
};
info!("Response total : {:?} ", start.elapsed().as_millis());
HttpResponse::Ok().json(response) // Return the response as JSON
}
How do I fix this spike?