Skip to content

Commit 5cbad30

Browse files
bors[bot]irevoirebidoubiwa
authored
Merge #417
417: Provide two methods to add documents from an async reader r=bidoubiwa a=irevoire # Pull Request When using Meilisearch-rust to forward documents from another source it’s a pain to use. These two simple methods let you send raw payload to meilisearch. But you **need** to specify your content-type. ---- ## Note I didn’t implements these method for the wasm version of the crate because `wasm-bindgen-futures` don’t provide an `AsyncReader` trait. Co-authored-by: Tamo <[email protected]> Co-authored-by: Charlotte Vermandel <[email protected]>
2 parents a29ea75 + 148b2a2 commit 5cbad30

File tree

3 files changed

+229
-0
lines changed

3 files changed

+229
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ meilisearch-index-setting-macro = { path = "meilisearch-index-setting-macro", ve
2727

2828
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
2929
futures = "0.3"
30+
futures-io = "0.3.26"
3031
isahc = { version = "1.0", features = ["http2", "text-decoding"], default_features = false }
3132
uuid = { version = "1.1.2", features = ["v4"] }
3233

src/indexes.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,73 @@ impl Index {
559559
.await
560560
}
561561

562+
/// Add a raw and unchecked payload to meilisearch.
563+
/// This can be useful if your application is only forwarding data from other sources.
564+
///
565+
/// If you send an already existing document (same id) the **whole existing document** will be overwritten by the new document.
566+
/// Fields previously in the document not present in the new document are removed.
567+
///
568+
/// For a partial update of the document see [Index::add_or_update_unchecked_payload].
569+
///
570+
/// # Example
571+
///
572+
/// ```
573+
/// use serde::{Serialize, Deserialize};
574+
///
575+
/// # use meilisearch_sdk::{client::*, indexes::*};
576+
/// # use std::thread::sleep;
577+
/// # use std::time::Duration;
578+
/// #
579+
/// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700");
580+
/// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey");
581+
/// # futures::executor::block_on(async move {
582+
/// let client = Client::new(MEILISEARCH_URL, MEILISEARCH_API_KEY);
583+
/// let movie_index = client.index("add_or_replace_unchecked_payload");
584+
///
585+
/// let task = movie_index.add_or_replace_unchecked_payload(
586+
/// r#"{ "id": 1, "body": "doggo" }
587+
/// { "id": 2, "body": "catto" }"#.as_bytes(),
588+
/// "application/x-ndjson",
589+
/// Some("id"),
590+
/// ).await.unwrap();
591+
/// // Meilisearch may take some time to execute the request so we are going to wait till it's completed
592+
/// client.wait_for_task(task, None, None).await.unwrap();
593+
///
594+
/// let movies = movie_index.get_documents::<serde_json::Value>().await.unwrap();
595+
/// assert!(movies.results.len() == 2);
596+
/// # movie_index.delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap();
597+
/// # });
598+
/// ```
599+
#[cfg(not(target_arch = "wasm32"))]
600+
pub async fn add_or_replace_unchecked_payload<
601+
T: futures_io::AsyncRead + Send + Sync + 'static,
602+
>(
603+
&self,
604+
payload: T,
605+
content_type: &str,
606+
primary_key: Option<&str>,
607+
) -> Result<TaskInfo, Error> {
608+
let url = if let Some(primary_key) = primary_key {
609+
format!(
610+
"{}/indexes/{}/documents?primaryKey={}",
611+
self.client.host, self.uid, primary_key
612+
)
613+
} else {
614+
format!("{}/indexes/{}/documents", self.client.host, self.uid)
615+
};
616+
stream_request::<(), T, TaskInfo>(
617+
&url,
618+
&self.client.api_key,
619+
Method::Post {
620+
query: (),
621+
body: payload,
622+
},
623+
content_type,
624+
202,
625+
)
626+
.await
627+
}
628+
562629
/// Alias for [Index::add_or_replace].
563630
pub async fn add_documents<T: Serialize>(
564631
&self,
@@ -648,6 +715,73 @@ impl Index {
648715
.await
649716
}
650717

718+
/// Add a raw and unchecked payload to meilisearch.
719+
/// This can be useful if your application is only forwarding data from other sources.
720+
///
721+
/// If you send an already existing document (same id) the old document will be only partially updated according to the fields of the new document.
722+
/// Thus, any fields not present in the new document are kept and remained unchanged.
723+
///
724+
/// To completely overwrite a document, check out the [Index::add_or_replace_unchecked_payload] documents method.
725+
///
726+
/// # Example
727+
///
728+
/// ```
729+
/// use serde::{Serialize, Deserialize};
730+
///
731+
/// # use meilisearch_sdk::{client::*, indexes::*};
732+
/// # use std::thread::sleep;
733+
/// # use std::time::Duration;
734+
/// #
735+
/// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700");
736+
/// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey");
737+
/// # futures::executor::block_on(async move {
738+
/// let client = Client::new(MEILISEARCH_URL, MEILISEARCH_API_KEY);
739+
/// let movie_index = client.index("add_or_replace_unchecked_payload");
740+
///
741+
/// let task = movie_index.add_or_update_unchecked_payload(
742+
/// r#"{ "id": 1, "body": "doggo" }
743+
/// { "id": 2, "body": "catto" }"#.as_bytes(),
744+
/// "application/x-ndjson",
745+
/// Some("id"),
746+
/// ).await.unwrap();
747+
/// // Meilisearch may take some time to execute the request so we are going to wait till it's completed
748+
/// client.wait_for_task(task, None, None).await.unwrap();
749+
///
750+
/// let movies = movie_index.get_documents::<serde_json::Value>().await.unwrap();
751+
/// assert!(movies.results.len() == 2);
752+
/// # movie_index.delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap();
753+
/// # });
754+
/// ```
755+
#[cfg(not(target_arch = "wasm32"))]
756+
pub async fn add_or_update_unchecked_payload<
757+
T: futures_io::AsyncRead + Send + Sync + 'static,
758+
>(
759+
&self,
760+
payload: T,
761+
content_type: &str,
762+
primary_key: Option<&str>,
763+
) -> Result<TaskInfo, Error> {
764+
let url = if let Some(primary_key) = primary_key {
765+
format!(
766+
"{}/indexes/{}/documents?primaryKey={}",
767+
self.client.host, self.uid, primary_key
768+
)
769+
} else {
770+
format!("{}/indexes/{}/documents", self.client.host, self.uid)
771+
};
772+
stream_request::<(), T, TaskInfo>(
773+
&url,
774+
&self.client.api_key,
775+
Method::Put {
776+
query: (),
777+
body: payload,
778+
},
779+
content_type,
780+
202,
781+
)
782+
.await
783+
}
784+
651785
/// Delete all documents in the index.
652786
///
653787
/// # Example

src/request.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,100 @@ pub(crate) async fn request<
115115
parse_response(status, expected_status_code, body)
116116
}
117117

118+
#[cfg(not(target_arch = "wasm32"))]
119+
pub(crate) async fn stream_request<
120+
'a,
121+
Query: Serialize,
122+
Body: futures_io::AsyncRead + Send + Sync + 'static,
123+
Output: DeserializeOwned + 'static,
124+
>(
125+
url: &str,
126+
apikey: &str,
127+
method: Method<Query, Body>,
128+
content_type: &str,
129+
expected_status_code: u16,
130+
) -> Result<Output, Error> {
131+
use isahc::http::header;
132+
use isahc::*;
133+
134+
let auth = format!("Bearer {apikey}");
135+
let user_agent = qualified_version();
136+
137+
let mut response = match method {
138+
Method::Get { query } => {
139+
let url = add_query_parameters(url, &query)?;
140+
141+
Request::get(url)
142+
.header(header::AUTHORIZATION, auth)
143+
.header(header::USER_AGENT, user_agent)
144+
.body(())
145+
.map_err(|_| crate::errors::Error::InvalidRequest)?
146+
.send_async()
147+
.await?
148+
}
149+
Method::Delete { query } => {
150+
let url = add_query_parameters(url, &query)?;
151+
152+
Request::delete(url)
153+
.header(header::AUTHORIZATION, auth)
154+
.header(header::USER_AGENT, user_agent)
155+
.body(())
156+
.map_err(|_| crate::errors::Error::InvalidRequest)?
157+
.send_async()
158+
.await?
159+
}
160+
Method::Post { query, body } => {
161+
let url = add_query_parameters(url, &query)?;
162+
163+
Request::post(url)
164+
.header(header::AUTHORIZATION, auth)
165+
.header(header::USER_AGENT, user_agent)
166+
.header(header::CONTENT_TYPE, content_type)
167+
.body(AsyncBody::from_reader(body))
168+
.map_err(|_| crate::errors::Error::InvalidRequest)?
169+
.send_async()
170+
.await?
171+
}
172+
Method::Patch { query, body } => {
173+
let url = add_query_parameters(url, &query)?;
174+
175+
Request::patch(url)
176+
.header(header::AUTHORIZATION, auth)
177+
.header(header::USER_AGENT, user_agent)
178+
.header(header::CONTENT_TYPE, content_type)
179+
.body(AsyncBody::from_reader(body))
180+
.map_err(|_| crate::errors::Error::InvalidRequest)?
181+
.send_async()
182+
.await?
183+
}
184+
Method::Put { query, body } => {
185+
let url = add_query_parameters(url, &query)?;
186+
187+
Request::put(url)
188+
.header(header::AUTHORIZATION, auth)
189+
.header(header::USER_AGENT, user_agent)
190+
.header(header::CONTENT_TYPE, content_type)
191+
.body(AsyncBody::from_reader(body))
192+
.map_err(|_| crate::errors::Error::InvalidRequest)?
193+
.send_async()
194+
.await?
195+
}
196+
};
197+
198+
let status = response.status().as_u16();
199+
200+
let mut body = response
201+
.text()
202+
.await
203+
.map_err(|e| crate::errors::Error::HttpError(e.into()))?;
204+
205+
if body.is_empty() {
206+
body = "null".to_string();
207+
}
208+
209+
parse_response(status, expected_status_code, body)
210+
}
211+
118212
#[cfg(target_arch = "wasm32")]
119213
pub fn add_query_parameters<Query: Serialize>(
120214
mut url: String,

0 commit comments

Comments
 (0)