Skip to content

Commit e9b682d

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
begin passing ComponentId through file storage (#27609)
whenever you're interacting with file storage, it's within the context of a component. low-level APIs that operate on the table level are given a TableNamespace, while higher level interfaces use ComponentId. We could consider passing it around as ComponentPath instead, but I would rather not convert from ComponentId -> ComponentPath -> ComponentId in places like funrun callbacks. And ComponentId seems to work. this PR passes the component down from a higher layer of code. It doesn't add the component to the protos or urls where it will eventually go. The pattern for passing component id into `TaskExecutor` is the same pattern we use for `resources`, but it feels a little funny to copy that pattern. GitOrigin-RevId: 210eaa19b8282d9c01a9da2971e13b44179585e7
1 parent 62ed975 commit e9b682d

File tree

18 files changed

+170
-57
lines changed

18 files changed

+170
-57
lines changed

crates/application/src/api.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use std::{
66
use async_trait::async_trait;
77
use bytes::Bytes;
88
use common::{
9-
components::ComponentFunctionPath,
9+
components::{
10+
ComponentFunctionPath,
11+
ComponentId,
12+
},
1013
pause::PauseClient,
1114
runtime::Runtime,
1215
types::{
@@ -165,6 +168,7 @@ pub trait ApplicationApi: Send + Sync {
165168
&self,
166169
host: &str,
167170
request_id: RequestId,
171+
component: ComponentId,
168172
content_length: Option<ContentLength>,
169173
content_type: Option<ContentType>,
170174
expected_sha256: Option<Sha256Digest>,
@@ -175,6 +179,7 @@ pub trait ApplicationApi: Send + Sync {
175179
&self,
176180
host: &str,
177181
request_id: RequestId,
182+
component: ComponentId,
178183
file_storage_id: FileStorageId,
179184
range: (Bound<u64>, Bound<u64>),
180185
) -> anyhow::Result<FileRangeStream>;
@@ -183,6 +188,7 @@ pub trait ApplicationApi: Send + Sync {
183188
&self,
184189
host: &str,
185190
request_id: RequestId,
191+
component: ComponentId,
186192
file_storage_id: FileStorageId,
187193
) -> anyhow::Result<FileStream>;
188194
}
@@ -337,32 +343,41 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
337343
&self,
338344
_host: &str,
339345
_request_id: RequestId,
346+
component: ComponentId,
340347
content_length: Option<ContentLength>,
341348
content_type: Option<ContentType>,
342349
expected_sha256: Option<Sha256Digest>,
343350
body: BoxStream<'_, anyhow::Result<Bytes>>,
344351
) -> anyhow::Result<DeveloperDocumentId> {
345-
self.store_file(content_length, content_type, expected_sha256, body)
346-
.await
352+
self.store_file(
353+
component,
354+
content_length,
355+
content_type,
356+
expected_sha256,
357+
body,
358+
)
359+
.await
347360
}
348361

349362
async fn get_file_range(
350363
&self,
351364
_host: &str,
352365
_request_id: RequestId,
366+
component: ComponentId,
353367
file_storage_id: FileStorageId,
354368
range: (Bound<u64>, Bound<u64>),
355369
) -> anyhow::Result<FileRangeStream> {
356-
self.get_file_range(file_storage_id, range).await
370+
self.get_file_range(component, file_storage_id, range).await
357371
}
358372

359373
async fn get_file(
360374
&self,
361375
_host: &str,
362376
_request_id: RequestId,
377+
component: ComponentId,
363378
file_storage_id: FileStorageId,
364379
) -> anyhow::Result<FileStream> {
365-
self.get_file(file_storage_id).await
380+
self.get_file(component, file_storage_id).await
366381
}
367382
}
368383

crates/application/src/application_function_runner/mod.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use common::{
2121
CanonicalizedComponentFunctionPath,
2222
ComponentDefinitionPath,
2323
ComponentFunctionPath,
24+
ComponentId,
2425
},
2526
errors::JsError,
2627
execution_context::ExecutionContext,
@@ -2067,24 +2068,31 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
20672068
async fn storage_get_url(
20682069
&self,
20692070
identity: Identity,
2071+
component: ComponentId,
20702072
storage_id: FileStorageId,
20712073
) -> anyhow::Result<Option<String>> {
20722074
let mut tx = self.database.begin(identity).await?;
2073-
self.file_storage.get_url(&mut tx, storage_id).await
2075+
self.file_storage
2076+
.get_url(&mut tx, component.into(), storage_id)
2077+
.await
20742078
}
20752079

20762080
async fn storage_get_file_entry(
20772081
&self,
20782082
identity: Identity,
2083+
component: ComponentId,
20792084
storage_id: FileStorageId,
20802085
) -> anyhow::Result<Option<FileStorageEntry>> {
20812086
let mut tx = self.database.begin(identity).await?;
2082-
self.file_storage.get_file_entry(&mut tx, storage_id).await
2087+
self.file_storage
2088+
.get_file_entry(&mut tx, component.into(), storage_id)
2089+
.await
20832090
}
20842091

20852092
async fn storage_store_file_entry(
20862093
&self,
20872094
identity: Identity,
2095+
component: ComponentId,
20882096
entry: FileStorageEntry,
20892097
) -> anyhow::Result<DeveloperDocumentId> {
20902098
let (_ts, id, _stats) = self
@@ -2098,7 +2106,7 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
20982106
async {
20992107
let id = self
21002108
.file_storage
2101-
.store_file_entry(tx, entry.clone())
2109+
.store_file_entry(tx, component.into(), entry.clone())
21022110
.await?;
21032111
Ok(id)
21042112
}
@@ -2112,6 +2120,7 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
21122120
async fn storage_delete(
21132121
&self,
21142122
identity: Identity,
2123+
component: ComponentId,
21152124
storage_id: FileStorageId,
21162125
) -> anyhow::Result<()> {
21172126
self.database
@@ -2122,7 +2131,9 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
21222131
"app_funrun_storage_delete",
21232132
|tx| {
21242133
async {
2125-
self.file_storage.delete(tx, storage_id.clone()).await?;
2134+
self.file_storage
2135+
.delete(tx, component.into(), storage_id.clone())
2136+
.await?;
21262137
Ok(())
21272138
}
21282139
.into()

crates/application/src/export_worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,6 +1100,7 @@ mod tests {
11001100
let usage_tracker = FunctionUsageTracker::new();
11011101
let file1_id = file_storage_wrapper
11021102
.store_file(
1103+
TableNamespace::test_user(),
11031104
None,
11041105
Some(ContentType::jpeg()),
11051106
futures::stream::iter(vec![Ok(Bytes::from_static(b"abc"))]),

crates/application/src/lib.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2178,6 +2178,7 @@ impl<RT: Runtime> Application<RT> {
21782178

21792179
pub async fn store_file(
21802180
&self,
2181+
component: ComponentId,
21812182
content_length: Option<ContentLength>,
21822183
content_type: Option<ContentType>,
21832184
expected_sha256: Option<Sha256Digest>,
@@ -2186,6 +2187,7 @@ impl<RT: Runtime> Application<RT> {
21862187
let storage_id = self
21872188
.file_storage
21882189
.store_file(
2190+
component.into(),
21892191
content_length,
21902192
content_type,
21912193
body,
@@ -2198,23 +2200,28 @@ impl<RT: Runtime> Application<RT> {
21982200

21992201
pub async fn store_file_entry(
22002202
&self,
2203+
component: ComponentId,
22012204
entry: FileStorageEntry,
22022205
) -> anyhow::Result<DeveloperDocumentId> {
22032206
let storage_id = self
22042207
.file_storage
2205-
.store_entry(entry, &self.usage_tracking)
2208+
.store_entry(component.into(), entry, &self.usage_tracking)
22062209
.await?;
22072210
Ok(storage_id)
22082211
}
22092212

2210-
pub async fn get_file(&self, storage_id: FileStorageId) -> anyhow::Result<FileStream> {
2213+
pub async fn get_file(
2214+
&self,
2215+
component: ComponentId,
2216+
storage_id: FileStorageId,
2217+
) -> anyhow::Result<FileStream> {
22112218
let mut file_storage_tx = self.begin(Identity::system()).await?;
22122219

22132220
let Some(file_entry) = self
22142221
.file_storage
22152222
.transactional_file_storage
22162223
// The transaction is not part of UDF so use the global usage counters.
2217-
.get_file_entry(&mut file_storage_tx, storage_id.clone())
2224+
.get_file_entry(&mut file_storage_tx, component.into(), storage_id.clone())
22182225
.await?
22192226
else {
22202227
return Err(ErrorMetadata::not_found(
@@ -2234,6 +2241,7 @@ impl<RT: Runtime> Application<RT> {
22342241

22352242
pub async fn get_file_range(
22362243
&self,
2244+
component: ComponentId,
22372245
storage_id: FileStorageId,
22382246
bytes_range: (Bound<u64>, Bound<u64>),
22392247
) -> anyhow::Result<FileRangeStream> {
@@ -2243,7 +2251,7 @@ impl<RT: Runtime> Application<RT> {
22432251
.file_storage
22442252
.transactional_file_storage
22452253
// The transaction is not part of UDF so use the global usage counters.
2246-
.get_file_entry(&mut file_storage_tx, storage_id.clone())
2254+
.get_file_entry(&mut file_storage_tx, component.into(), storage_id.clone())
22472255
.await?
22482256
else {
22492257
return Err(ErrorMetadata::not_found(

crates/file_storage/src/core.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ use usage_tracking::{
5454
StorageCallTracker,
5555
StorageUsageTracker,
5656
};
57-
use value::id_v6::DeveloperDocumentId;
57+
use value::{
58+
id_v6::DeveloperDocumentId,
59+
TableNamespace,
60+
};
5861

5962
use crate::{
6063
metrics::{
@@ -93,9 +96,10 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
9396
pub async fn get_url(
9497
&self,
9598
tx: &mut Transaction<RT>,
99+
namespace: TableNamespace,
96100
storage_id: FileStorageId,
97101
) -> anyhow::Result<Option<String>> {
98-
self.get_url_batch(tx, btreemap! { 0 => storage_id })
102+
self.get_url_batch(tx, namespace, btreemap! { 0 => storage_id })
99103
.await
100104
.remove(&0)
101105
.context("batch_key missing")?
@@ -104,10 +108,11 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
104108
pub async fn get_url_batch(
105109
&self,
106110
tx: &mut Transaction<RT>,
111+
namespace: TableNamespace,
107112
storage_ids: BTreeMap<BatchKey, FileStorageId>,
108113
) -> BTreeMap<BatchKey, anyhow::Result<Option<String>>> {
109114
let origin = &self.convex_origin;
110-
let files = self.get_file_entry_batch(tx, storage_ids).await;
115+
let files = self.get_file_entry_batch(tx, namespace, storage_ids).await;
111116
files
112117
.into_iter()
113118
.map(|(batch_key, result)| {
@@ -124,9 +129,10 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
124129
pub async fn delete(
125130
&self,
126131
tx: &mut Transaction<RT>,
132+
namespace: TableNamespace,
127133
storage_id: FileStorageId,
128134
) -> anyhow::Result<()> {
129-
let success = self._delete(tx, storage_id.clone()).await?;
135+
let success = self._delete(tx, namespace, storage_id.clone()).await?;
130136
if !success {
131137
anyhow::bail!(ErrorMetadata::not_found(
132138
"StorageIdNotFound",
@@ -139,9 +145,10 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
139145
pub async fn get_file_entry(
140146
&self,
141147
tx: &mut Transaction<RT>,
148+
namespace: TableNamespace,
142149
storage_id: FileStorageId,
143150
) -> anyhow::Result<Option<FileStorageEntry>> {
144-
self.get_file_entry_batch(tx, btreemap! { 0 => storage_id })
151+
self.get_file_entry_batch(tx, namespace, btreemap! { 0 => storage_id })
145152
.await
146153
.remove(&0)
147154
.context("batch_key missing")?
@@ -150,9 +157,10 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
150157
pub async fn get_file_entry_batch(
151158
&self,
152159
tx: &mut Transaction<RT>,
160+
namespace: TableNamespace,
153161
storage_ids: BTreeMap<BatchKey, FileStorageId>,
154162
) -> BTreeMap<BatchKey, anyhow::Result<Option<FileStorageEntry>>> {
155-
FileStorageModel::new(tx)
163+
FileStorageModel::new(tx, namespace)
156164
.get_file_batch(storage_ids)
157165
.await
158166
.into_iter()
@@ -275,9 +283,10 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
275283
async fn _delete(
276284
&self,
277285
tx: &mut Transaction<RT>,
286+
namespace: TableNamespace,
278287
storage_id: FileStorageId,
279288
) -> anyhow::Result<bool> {
280-
let did_delete = FileStorageModel::new(tx)
289+
let did_delete = FileStorageModel::new(tx, namespace)
281290
.delete_file(storage_id, Identity::system())
282291
.await?
283292
.is_some();
@@ -341,10 +350,13 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
341350
pub async fn store_file_entry(
342351
&self,
343352
tx: &mut Transaction<RT>,
353+
namespace: TableNamespace,
344354
entry: FileStorageEntry,
345355
) -> anyhow::Result<DeveloperDocumentId> {
346356
let table_mapping = tx.table_mapping().clone();
347-
let system_doc_id = FileStorageModel::new(tx).store_file(entry).await?;
357+
let system_doc_id = FileStorageModel::new(tx, namespace)
358+
.store_file(entry)
359+
.await?;
348360
let virtual_id = tx
349361
.virtual_system_mapping()
350362
.system_resolved_id_to_virtual_developer_id(
@@ -360,6 +372,7 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
360372
impl<RT: Runtime> FileStorage<RT> {
361373
pub async fn store_file(
362374
&self,
375+
namespace: TableNamespace,
363376
content_length: Option<ContentLength>,
364377
content_type: Option<ContentType>,
365378
file: impl Stream<Item = anyhow::Result<impl Into<Bytes>>> + Send,
@@ -370,13 +383,14 @@ impl<RT: Runtime> FileStorage<RT> {
370383
.transactional_file_storage
371384
.upload_file(content_length, content_type, file, expected_sha256)
372385
.await?;
373-
self.store_entry(entry, usage_tracker).await
386+
self.store_entry(namespace, entry, usage_tracker).await
374387
}
375388

376389
/// Record the existence of a file that has already been uploaded to the
377390
/// underlying storage implementation.
378391
pub async fn store_entry(
379392
&self,
393+
namespace: TableNamespace,
380394
entry: FileStorageEntry,
381395
usage_tracker: &dyn StorageUsageTracker,
382396
) -> anyhow::Result<DeveloperDocumentId> {
@@ -386,7 +400,7 @@ impl<RT: Runtime> FileStorage<RT> {
386400
let mut tx = self.database.begin(Identity::system()).await?;
387401
let virtual_id = self
388402
.transactional_file_storage
389-
.store_file_entry(&mut tx, entry)
403+
.store_file_entry(&mut tx, namespace, entry)
390404
.await?;
391405
self.database
392406
.commit_with_write_source(tx, "file_storage_store_file")

crates/file_storage/src/tests.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use model::{
2525
use runtime::testing::TestRuntime;
2626
use storage::LocalDirStorage;
2727
use usage_tracking::UsageCounter;
28+
use value::TableNamespace;
2829

2930
use super::FileStorage;
3031
use crate::TransactionalFileStorage;
@@ -50,7 +51,7 @@ async fn test_get_file_404(rt: TestRuntime) -> anyhow::Result<()> {
5051
let mut tx = database.begin(Identity::system()).await?;
5152
assert!(file_storage
5253
.transactional_file_storage
53-
.get_file_entry(&mut tx, bogus_storage_id)
54+
.get_file_entry(&mut tx, TableNamespace::test_user(), bogus_storage_id)
5455
.await?
5556
.is_none());
5657

@@ -68,6 +69,7 @@ async fn test_store_file_sha_mismatch(rt: TestRuntime) -> anyhow::Result<()> {
6869
let wrong = Sha256::hash(b"Wrong thing");
6970
let err: ErrorMetadata = file_storage
7071
.store_file(
72+
TableNamespace::test_user(),
7173
None,
7274
None,
7375
stream::iter([Ok(big_file)]),

0 commit comments

Comments
 (0)