Skip to content

Commit 241e797

Browse files
meili-bors[bot]carlosb1bidoubiwa
authored
Merge #494
494: Added function for sending NDJSON documents r=bidoubiwa a=carlosb1 # Pull Request ## Related issue Fixes #191 ## What does this PR do? Added the first two functions (with their tests) for issue #191. It includes two functions for sending njdson payloads - ` add_documents_ndjson` - `update_documents_ndjson` ## PR checklist Please check if your PR fulfills the following requirements: - [X] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)? - [X] Have you read the contributing guidelines? - [X] Have you made sure that the title is accurate and descriptive of the changes? Thank you so much for contributing to Meilisearch! Co-authored-by: carlosb1 <[email protected]> Co-authored-by: cvermand <[email protected]>
2 parents daeb24e + 9193fa4 commit 241e797

File tree

1 file changed

+146
-0
lines changed

1 file changed

+146
-0
lines changed

src/indexes.rs

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,98 @@ impl Index {
661661
self.add_or_replace(documents, primary_key).await
662662
}
663663

664+
/// Add a raw ndjson payload and update them if they already.
665+
///
666+
/// It configures the correct content type for ndjson data.
667+
///
668+
/// If you send an already existing document (same id) the old document will be only partially updated according to the fields of the new document.
669+
/// Thus, any fields not present in the new document are kept and remained unchanged.
670+
///
671+
/// To completely overwrite a document, check out the [`Index::add_documents_ndjson`] documents method.
672+
///
673+
/// # Example
674+
///
675+
/// ```
676+
/// # use serde::{Serialize, Deserialize};
677+
/// # use meilisearch_sdk::{client::*, indexes::*};
678+
/// # use std::thread::sleep;
679+
/// # use std::time::Duration;
680+
/// #
681+
/// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700");
682+
/// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey");
683+
/// # futures::executor::block_on(async move {
684+
/// # let client = Client::new(MEILISEARCH_URL, Some(MEILISEARCH_API_KEY));
685+
/// let movie_index = client.index("update_documents_ndjson");
686+
///
687+
/// let task = movie_index.update_documents_ndjson(
688+
/// r#"{ "id": 1, "body": "doggo" }
689+
/// { "id": 2, "body": "catto" }"#.as_bytes(),
690+
/// Some("id"),
691+
/// ).await.unwrap();
692+
/// // Meilisearch may take some time to execute the request so we are going to wait till it's completed
693+
/// client.wait_for_task(task, None, None).await.unwrap();
694+
///
695+
/// let movies = movie_index.get_documents::<serde_json::Value>().await.unwrap();
696+
/// assert!(movies.results.len() == 2);
697+
/// # movie_index.delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap();
698+
/// # });
699+
/// ```
700+
#[cfg(not(target_arch = "wasm32"))]
701+
pub async fn update_documents_ndjson<T: futures_io::AsyncRead + Send + Sync + 'static>(
702+
&self,
703+
payload: T,
704+
primary_key: Option<&str>,
705+
) -> Result<TaskInfo, Error> {
706+
self.add_or_update_unchecked_payload(payload, "application/x-ndjson", primary_key)
707+
.await
708+
}
709+
710+
/// Add a raw ndjson payload to meilisearch.
711+
///
712+
/// It configures the correct content type for ndjson data.
713+
///
714+
/// If you send an already existing document (same id) the **whole existing document** will be overwritten by the new document.
715+
/// Fields previously in the document not present in the new document are removed.
716+
///
717+
/// For a partial update of the document see [`Index::update_documents_ndjson`].
718+
///
719+
/// # Example
720+
///
721+
/// ```
722+
/// # use serde::{Serialize, Deserialize};
723+
/// # use meilisearch_sdk::{client::*, indexes::*};
724+
/// # use std::thread::sleep;
725+
/// # use std::time::Duration;
726+
/// #
727+
/// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700");
728+
/// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey");
729+
/// # futures::executor::block_on(async move {
730+
/// # let client = Client::new(MEILISEARCH_URL, Some(MEILISEARCH_API_KEY));
731+
/// let movie_index = client.index("add_documents_ndjson");
732+
///
733+
/// let task = movie_index.add_documents_ndjson(
734+
/// r#"{ "id": 1, "body": "doggo" }
735+
/// { "id": 2, "body": "catto" }"#.as_bytes(),
736+
/// Some("id"),
737+
/// ).await.unwrap();
738+
/// // Meilisearch may take some time to execute the request so we are going to wait till it's completed
739+
/// client.wait_for_task(task, None, None).await.unwrap();
740+
///
741+
/// let movies = movie_index.get_documents::<serde_json::Value>().await.unwrap();
742+
/// assert!(movies.results.len() == 2);
743+
/// # movie_index.delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap();
744+
/// # });
745+
/// ```
746+
#[cfg(not(target_arch = "wasm32"))]
747+
pub async fn add_documents_ndjson<T: futures_io::AsyncRead + Send + Sync + 'static>(
748+
&self,
749+
payload: T,
750+
primary_key: Option<&str>,
751+
) -> Result<TaskInfo, Error> {
752+
self.add_or_replace_unchecked_payload(payload, "application/x-ndjson", primary_key)
753+
.await
754+
}
755+
664756
/// Add a list of documents and update them if they already.
665757
///
666758
/// If you send an already existing document (same id) the old document will be only partially updated according to the fields of the new document.
@@ -1861,6 +1953,60 @@ mod tests {
18611953
assert_eq!(res.offset, 2);
18621954
}
18631955

1956+
#[meilisearch_test]
1957+
async fn test_add_documents_ndjson(client: Client, index: Index) -> Result<(), Error> {
1958+
let ndjson = r#"{ "id": 1, "body": "doggo" }{ "id": 2, "body": "catto" }"#.as_bytes();
1959+
1960+
let task = index
1961+
.add_documents_ndjson(ndjson, Some("id"))
1962+
.await?
1963+
.wait_for_completion(&client, None, None)
1964+
.await?;
1965+
1966+
let status = index.get_task(task).await?;
1967+
let elements = index.get_documents::<serde_json::Value>().await.unwrap();
1968+
assert!(matches!(status, Task::Succeeded { .. }));
1969+
assert!(elements.results.len() == 2);
1970+
1971+
Ok(())
1972+
}
1973+
1974+
#[meilisearch_test]
1975+
async fn test_update_documents_ndjson(client: Client, index: Index) -> Result<(), Error> {
1976+
let old_ndjson = r#"{ "id": 1, "body": "doggo" }{ "id": 2, "body": "catto" }"#.as_bytes();
1977+
let updated_ndjson =
1978+
r#"{ "id": 1, "second_body": "second_doggo" }{ "id": 2, "second_body": "second_catto" }"#.as_bytes();
1979+
// Add first njdson document
1980+
let task = index
1981+
.add_documents_ndjson(old_ndjson, Some("id"))
1982+
.await?
1983+
.wait_for_completion(&client, None, None)
1984+
.await?;
1985+
let _ = index.get_task(task).await?;
1986+
1987+
// Update via njdson document
1988+
let task = index
1989+
.update_documents_ndjson(updated_ndjson, Some("id"))
1990+
.await?
1991+
.wait_for_completion(&client, None, None)
1992+
.await?;
1993+
1994+
let status = index.get_task(task).await?;
1995+
let elements = index.get_documents::<serde_json::Value>().await.unwrap();
1996+
1997+
assert!(matches!(status, Task::Succeeded { .. }));
1998+
assert!(elements.results.len() == 2);
1999+
2000+
let expected_result = vec![
2001+
json!( {"body": "doggo", "id": 1, "second_body": "second_doggo"}),
2002+
json!( {"body": "catto", "id": 2, "second_body": "second_catto"}),
2003+
];
2004+
2005+
assert_eq!(elements.results, expected_result);
2006+
2007+
Ok(())
2008+
}
2009+
18642010
#[meilisearch_test]
18652011
async fn test_get_one_task(client: Client, index: Index) -> Result<(), Error> {
18662012
let task = index

0 commit comments

Comments
 (0)