Skip to content

Commit 60acdf6

Browse files
Sujay JayakarConvex, Inc.
authored andcommitted
Wire in schema and index changes to {start,finish}_push (#26839)
GitOrigin-RevId: d479d6d401bdd9e078eee37ce25217224630be53
1 parent d4724e9 commit 60acdf6

File tree

21 files changed

+633
-160
lines changed

21 files changed

+633
-160
lines changed

crates/application/src/snapshot_import.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2182,7 +2182,11 @@ async fn prepare_table_for_import<RT: Runtime>(
21822182
)
21832183
.await?;
21842184
IndexModel::new(tx)
2185-
.copy_indexes_to_table(table_name, table_id.tablet_id)
2185+
.copy_indexes_to_table(
2186+
TableNamespace::by_component_TODO(),
2187+
table_name,
2188+
table_id.tablet_id,
2189+
)
21862190
.await?;
21872191
Ok(table_id)
21882192
}
@@ -3007,10 +3011,10 @@ a
30073011
let mut tx = app.begin(identity.clone()).await?;
30083012
let mut index_model = IndexModel::new(&mut tx);
30093013
let index_id = index_model
3010-
.add_application_index(IndexMetadata::new_enabled(
3011-
index_name.clone(),
3012-
vec!["a".parse()?].try_into()?,
3013-
))
3014+
.add_application_index(
3015+
TableNamespace::test_user(),
3016+
IndexMetadata::new_enabled(index_name.clone(), vec!["a".parse()?].try_into()?),
3017+
)
30143018
.await?;
30153019
app.commit_test(tx).await?;
30163020
index_id

crates/database/src/bootstrap_model/index.rs

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ use indexing::{
6161
Index,
6262
},
6363
};
64-
use itertools::Itertools;
6564
use value::{
6665
ResolvedDocumentId,
6766
TableMapping,
@@ -113,14 +112,15 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
113112
/// transaction has committed.
114113
pub async fn add_application_index(
115114
&mut self,
115+
namespace: TableNamespace,
116116
index: IndexMetadata<TableName>,
117117
) -> anyhow::Result<ResolvedDocumentId> {
118118
anyhow::ensure!(
119119
self.tx.identity().is_admin() || self.tx.identity().is_system(),
120120
unauthorized_error("add_index")
121121
);
122122
anyhow::ensure!(!index.name.is_system_owned(), "Can't change system indexes");
123-
let application_indexes = self.get_application_indexes().await?;
123+
let application_indexes = self.get_application_indexes(namespace).await?;
124124
// Indexes may exist in both a pending and an enabled state. If we're at or over
125125
// the index limit, we should still be able to add a new pending copy of
126126
// an enabled index with the expectation that the pending index will
@@ -137,8 +137,7 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
137137
|| index_names_in_table.len() < MAX_INDEXES_PER_TABLE,
138138
index_validation_error::too_many_indexes(index.name.table(), MAX_INDEXES_PER_TABLE)
139139
);
140-
self._add_index(TableNamespace::by_component_TODO(), index)
141-
.await
140+
self._add_index(namespace, index).await
142141
}
143142

144143
/// Add system index.
@@ -281,7 +280,7 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
281280
.await?;
282281

283282
tracing::info!(
284-
"Committed indexes: (added {}. dropped {})",
283+
"Committed indexes for {namespace:?}: (added {}. dropped {})",
285284
index_diff.added.len(),
286285
index_diff.dropped.len(),
287286
);
@@ -312,7 +311,8 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
312311
added: vec![],
313312
dropped: dropped.clone(),
314313
};
315-
self.apply_index_diff(&only_dropped_tables).await?;
314+
self.apply_index_diff(namespace, &only_dropped_tables)
315+
.await?;
316316

317317
// Added indexes should have backfilled via build_indexes
318318
// (for < 0.14.0 CLIs) or in apply_config (for >= 0.14.0 CLIs).
@@ -339,15 +339,19 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
339339
Ok(())
340340
}
341341

342-
pub async fn apply_index_diff(&mut self, diff: &LegacyIndexDiff) -> anyhow::Result<()> {
342+
pub async fn apply_index_diff(
343+
&mut self,
344+
namespace: TableNamespace,
345+
diff: &LegacyIndexDiff,
346+
) -> anyhow::Result<()> {
343347
if !(self.tx.identity().is_admin() || self.tx.identity().is_system()) {
344348
anyhow::bail!(unauthorized_error("modify_indexes"));
345349
}
346350
for index in &diff.dropped {
347351
self.drop_index(index.id()).await?;
348352
}
349353
for index in &diff.added {
350-
self.add_application_index(index.clone()).await?;
354+
self.add_application_index(namespace, index.clone()).await?;
351355
}
352356

353357
Ok(())
@@ -398,7 +402,7 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
398402

399403
let mut remaining_indexes: HashMap<IndexName, Vec<ParsedDocument<DeveloperIndexMetadata>>> =
400404
HashMap::new();
401-
for index in self.get_application_indexes().await? {
405+
for index in self.get_application_indexes(namespace).await? {
402406
remaining_indexes
403407
.entry(index.name.clone())
404408
.or_default()
@@ -513,7 +517,8 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
513517
// Dropped will be removed in apply_config when the rest of the schema is
514518
// committed.
515519
if !only_new_and_mutated.is_empty() {
516-
self.apply_index_diff(&only_new_and_mutated).await?;
520+
self.apply_index_diff(namespace, &only_new_and_mutated)
521+
.await?;
517522
}
518523
Ok(diff)
519524
}
@@ -527,7 +532,7 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
527532
if diff.is_empty() {
528533
return Ok(diff);
529534
}
530-
self.apply_index_diff(&diff).await?;
535+
self.apply_index_diff(namespace, &diff).await?;
531536
Ok(diff)
532537
}
533538

@@ -775,8 +780,9 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
775780
/// name (but different configurations).
776781
pub async fn get_system_indexes(
777782
&mut self,
783+
namespace: TableNamespace,
778784
) -> anyhow::Result<Vec<ParsedDocument<DeveloperIndexMetadata>>> {
779-
self.get_indexes(IndexCategory::System).await
785+
self.get_indexes(IndexCategory::System, namespace).await
780786
}
781787

782788
/// Returns all registered indexes that aren't system owned including both
@@ -786,23 +792,32 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
786792
/// name (but different configurations).
787793
pub async fn get_application_indexes(
788794
&mut self,
795+
namespace: TableNamespace,
789796
) -> anyhow::Result<Vec<ParsedDocument<DeveloperIndexMetadata>>> {
790-
self.get_indexes(IndexCategory::Application).await
797+
self.get_indexes(IndexCategory::Application, namespace)
798+
.await
791799
}
792800

793801
async fn get_indexes(
794802
&mut self,
795803
category: IndexCategory,
804+
namespace: TableNamespace,
796805
) -> anyhow::Result<Vec<ParsedDocument<DeveloperIndexMetadata>>> {
797-
let indexes_with_id = self.get_all_indexes().await?;
806+
let indexes = self.get_all_indexes().await?;
798807
let table_mapping = self.tx.table_mapping();
799-
indexes_with_id
800-
.into_iter()
801-
.filter(|doc| category.belongs(doc, table_mapping))
802-
.map(|doc: ParsedDocument<TabletIndexMetadata>| {
803-
doc.map(|metadata| metadata.map_table(&table_mapping.tablet_to_name()))
804-
})
805-
.try_collect()
808+
let mut result = vec![];
809+
for doc in indexes {
810+
if !category.belongs(&doc, table_mapping) {
811+
continue;
812+
}
813+
let tablet_id = *doc.name.table();
814+
if table_mapping.tablet_namespace(tablet_id)? != namespace {
815+
continue;
816+
}
817+
let doc = doc.map(|metadata| metadata.map_table(&table_mapping.tablet_to_name()))?;
818+
result.push(doc);
819+
}
820+
Ok(result)
806821
}
807822

808823
pub async fn drop_index(&mut self, index_id: ResolvedDocumentId) -> anyhow::Result<()> {
@@ -812,10 +827,14 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
812827
Ok(())
813828
}
814829

815-
pub async fn drop_system_index(&mut self, index_name: IndexName) -> anyhow::Result<()> {
830+
pub async fn drop_system_index(
831+
&mut self,
832+
namespace: TableNamespace,
833+
index_name: IndexName,
834+
) -> anyhow::Result<()> {
816835
anyhow::ensure!(index_name.table().is_system());
817836
let system_index = self
818-
.get_system_indexes()
837+
.get_system_indexes(namespace)
819838
.await?
820839
.into_iter()
821840
.find(|index| index.name == index_name);
@@ -828,14 +847,15 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
828847

829848
pub async fn copy_indexes_to_table(
830849
&mut self,
850+
namespace: TableNamespace,
831851
source_table: &TableName,
832852
target_table: TabletId,
833853
) -> anyhow::Result<()> {
834854
// Copy over enabled indexes from existing active table, if any.
835855
let Some(active_table_id) = self
836856
.tx
837857
.table_mapping()
838-
.namespace(TableNamespace::by_component_TODO())
858+
.namespace(namespace)
839859
.id_if_exists(source_table)
840860
else {
841861
return Ok(());

crates/database/src/bootstrap_model/system_metadata.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ use common::{
55
};
66
use value::{
77
ConvexObject,
8+
InternalId,
89
ResolvedDocumentId,
10+
TableIdentifier,
911
TableName,
1012
TableNamespace,
13+
TabletIdAndTableNumber,
1114
};
1215

1316
use crate::{
@@ -57,8 +60,37 @@ impl<'a, RT: Runtime> SystemMetadataModel<'a, RT> {
5760
if !(self.tx.identity.is_system() || self.tx.identity.is_admin()) {
5861
anyhow::bail!(unauthorized_error("insert_metadata"));
5962
}
60-
let table_id = self
61-
.tx
63+
let table_id = self.lookup_table_id(table)?;
64+
let id = self.tx.id_generator.generate(&table_id);
65+
let creation_time = self.tx.next_creation_time.increment()?;
66+
let document = ResolvedDocument::new(id, creation_time, value)?;
67+
self.tx.insert_document(document).await
68+
}
69+
70+
pub fn allocate_internal_id(&mut self) -> anyhow::Result<InternalId> {
71+
Ok(self.tx.id_generator.generate_internal())
72+
}
73+
74+
/// Create a new document with a predetermined internal ID.
75+
pub async fn insert_with_internal_id(
76+
&mut self,
77+
table: &TableName,
78+
internal_id: InternalId,
79+
value: ConvexObject,
80+
) -> anyhow::Result<ResolvedDocumentId> {
81+
anyhow::ensure!(table.is_system());
82+
if !(self.tx.identity.is_system() || self.tx.identity.is_admin()) {
83+
anyhow::bail!(unauthorized_error("insert_metadata"));
84+
}
85+
let table_id = self.lookup_table_id(table)?;
86+
let document_id = table_id.id(internal_id);
87+
let creation_time = self.tx.next_creation_time.increment()?;
88+
let document = ResolvedDocument::new(document_id, creation_time, value)?;
89+
self.tx.insert_document(document).await
90+
}
91+
92+
fn lookup_table_id(&mut self, table: &TableName) -> anyhow::Result<TabletIdAndTableNumber> {
93+
self.tx
6294
.table_mapping()
6395
.namespace(self.namespace)
6496
.id(table)
@@ -71,11 +103,7 @@ impl<'a, RT: Runtime> SystemMetadataModel<'a, RT> {
71103
} else {
72104
format!("Failed to find system table {table}")
73105
}
74-
})?;
75-
let id = self.tx.id_generator.generate(&table_id);
76-
let creation_time = self.tx.next_creation_time.increment()?;
77-
let document = ResolvedDocument::new(id, creation_time, value)?;
78-
self.tx.insert_document(document).await
106+
})
79107
}
80108

81109
/// Creates a new document with given value in the specified table without

crates/database/src/search_and_vector_bootstrap.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,7 @@ mod tests {
826826
let index_metadata = backfilling_vector_index()?;
827827
let mut tx = fixtures.db.begin_system().await?;
828828
IndexModel::new(&mut tx)
829-
.add_application_index(index_metadata.clone())
829+
.add_application_index(TableNamespace::test_user(), index_metadata.clone())
830830
.await?;
831831
fixtures.db.commit(tx).await?;
832832
reopen_db(&rt, &fixtures).await?;
@@ -842,7 +842,7 @@ mod tests {
842842
let index_metadata = backfilling_vector_index()?;
843843
let mut tx = db.begin_system().await?;
844844
IndexModel::new(&mut tx)
845-
.add_application_index(index_metadata.clone())
845+
.add_application_index(TableNamespace::test_user(), index_metadata.clone())
846846
.await?;
847847
db.commit(tx).await?;
848848
backfill_vector_indexes(rt.clone(), db.clone(), reader, storage.clone()).await?;
@@ -1188,7 +1188,7 @@ mod tests {
11881188
btreeset! {"filterField".parse()?},
11891189
);
11901190
IndexModel::new(&mut tx)
1191-
.add_application_index(index)
1191+
.add_application_index(TableNamespace::test_user(), index)
11921192
.await?;
11931193
db.commit(tx).await?;
11941194

crates/database/src/table_iteration.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -788,10 +788,10 @@ mod tests {
788788
let index_fields = IndexedFields::try_from(vec![field.clone()])?;
789789
let mut tx = database.begin(Identity::system()).await?;
790790
IndexModel::new(&mut tx)
791-
.add_application_index(IndexMetadata::new_enabled(
792-
index_name.clone(),
793-
index_fields.clone(),
794-
))
791+
.add_application_index(
792+
TableNamespace::test_user(),
793+
IndexMetadata::new_enabled(index_name.clone(), index_fields.clone()),
794+
)
795795
.await?;
796796
database.commit(tx).await?;
797797
IndexWorker::new_terminating(

0 commit comments

Comments
 (0)