Skip to content

Commit 7d52d79

Browse files
nipunn1313Convex, Inc.
authored and
Convex, Inc.
committed
Upgrade async_zip (#27417)
The new interface is rather nice - as it uses the type system to alternate between zipfile and zipreader. GitOrigin-RevId: 38158a31fc97b5e6ce21a5a8bca6b8a7a182cdc1
1 parent fcdb2d3 commit 7d52d79

File tree

12 files changed

+278
-204
lines changed

12 files changed

+278
-204
lines changed

Cargo.lock

Lines changed: 27 additions & 41 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ async-channel = "1.9.0"
1111
async-compression = { version = "0.4.10", features = [ "tokio", "zstd", "gzip" ] }
1212
async-recursion = "1.1.1"
1313
async-trait = "0.1"
14-
async_zip = { version = "0.0.9", default-features = false, features = [ "zstd", "deflate" ] }
14+
async_zip = { version = "0.0.17", default-features = false, features = [ "zstd", "deflate", "tokio" ] }
1515
cbc = { version = "0.1.2" }
1616
csv-async = "1.2"
1717
atomic_refcell = "0.1.13"

crates/application/src/export_worker.rs

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@ use std::{
99

1010
use anyhow::Context;
1111
use async_zip::{
12-
write::{
12+
tokio::write::{
1313
EntryStreamWriter,
1414
ZipFileWriter,
1515
},
1616
Compression,
1717
ZipEntryBuilder,
18-
ZipEntryBuilderExt,
1918
};
2019
use bytes::Bytes;
2120
use common::{
@@ -723,7 +722,7 @@ impl<'a, 'b> ZipSnapshotTableUpload<'a, 'b> {
723722
table_name: TableName,
724723
) -> anyhow::Result<Self> {
725724
let source_path = format!("{table_name}/documents.jsonl");
726-
let builder = ZipEntryBuilder::new(source_path.clone(), Compression::Deflate)
725+
let builder = ZipEntryBuilder::new(source_path.into(), Compression::Deflate)
727726
.unix_permissions(ZIP_ENTRY_PERMISSIONS);
728727
let entry_writer = zip_writer.write_entry_stream(builder.build()).await?;
729728
Ok(Self { entry_writer })
@@ -736,11 +735,8 @@ impl<'a, 'b> ZipSnapshotTableUpload<'a, 'b> {
736735

737736
async fn write_json_line(&mut self, json: JsonValue) -> anyhow::Result<()> {
738737
let buf = serde_json::to_vec(&json)?;
739-
self.entry_writer.compat_mut_write().write_all(&buf).await?;
740-
self.entry_writer
741-
.compat_mut_write()
742-
.write_all(&AFTER_DOCUMENTS_CLEAN)
743-
.await?;
738+
self.entry_writer.write_all(&buf).await?;
739+
self.entry_writer.write_all(&AFTER_DOCUMENTS_CLEAN).await?;
744740
Ok(())
745741
}
746742

@@ -756,7 +752,7 @@ struct ZipSnapshotUpload<'a> {
756752

757753
impl<'a> ZipSnapshotUpload<'a> {
758754
async fn new(out: &'a mut ChannelWriter) -> anyhow::Result<Self> {
759-
let writer = ZipFileWriter::new(out);
755+
let writer = ZipFileWriter::with_tokio(out);
760756
let mut zip_snapshot_upload = Self { writer };
761757
zip_snapshot_upload
762758
.write_full_file(format!("README.md"), README_MD_CONTENTS)
@@ -765,13 +761,10 @@ impl<'a> ZipSnapshotUpload<'a> {
765761
}
766762

767763
async fn write_full_file(&mut self, path: String, contents: &str) -> anyhow::Result<()> {
768-
let builder = ZipEntryBuilder::new(path, Compression::Deflate)
764+
let builder = ZipEntryBuilder::new(path.into(), Compression::Deflate)
769765
.unix_permissions(ZIP_ENTRY_PERMISSIONS);
770766
let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?;
771-
entry_writer
772-
.compat_mut_write()
773-
.write_all(contents.as_bytes())
774-
.await?;
767+
entry_writer.write_all(contents.as_bytes()).await?;
775768
entry_writer.close().await?;
776769
Ok(())
777770
}
@@ -781,11 +774,11 @@ impl<'a> ZipSnapshotUpload<'a> {
781774
path: String,
782775
mut contents: BoxStream<'_, std::io::Result<Bytes>>,
783776
) -> anyhow::Result<()> {
784-
let builder = ZipEntryBuilder::new(path, Compression::Deflate)
777+
let builder = ZipEntryBuilder::new(path.into(), Compression::Deflate)
785778
.unix_permissions(ZIP_ENTRY_PERMISSIONS);
786779
let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?;
787780
while let Some(chunk) = contents.try_next().await? {
788-
entry_writer.compat_mut_write().write_all(&chunk).await?;
781+
entry_writer.write_all(&chunk).await?;
789782
}
790783
entry_writer.close().await?;
791784
Ok(())
@@ -817,23 +810,21 @@ impl<'a> ZipSnapshotUpload<'a> {
817810
generated_schema: GeneratedSchema<T>,
818811
) -> anyhow::Result<()> {
819812
let generated_schema_path = format!("{table_name}/generated_schema.jsonl");
820-
let builder = ZipEntryBuilder::new(generated_schema_path.clone(), Compression::Deflate)
813+
let builder = ZipEntryBuilder::new(generated_schema_path.into(), Compression::Deflate)
821814
.unix_permissions(ZIP_ENTRY_PERMISSIONS);
822815
let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?;
823816
let generated_schema_str = generated_schema.inferred_shape.to_string();
824817
entry_writer
825-
.compat_mut_write()
826818
.write_all(serde_json::to_string(&generated_schema_str)?.as_bytes())
827819
.await?;
828-
entry_writer.compat_mut_write().write_all(b"\n").await?;
820+
entry_writer.write_all(b"\n").await?;
829821
for (override_id, override_export_context) in generated_schema.overrides.into_iter() {
830822
let override_json =
831823
json!({override_id.encode(): JsonValue::from(override_export_context)});
832824
entry_writer
833-
.compat_mut_write()
834825
.write_all(serde_json::to_string(&override_json)?.as_bytes())
835826
.await?;
836-
entry_writer.compat_mut_write().write_all(b"\n").await?;
827+
entry_writer.write_all(b"\n").await?;
837828
}
838829
entry_writer.close().await?;
839830
Ok(())
@@ -857,6 +848,7 @@ mod tests {
857848
use anyhow::Context;
858849
use bytes::Bytes;
859850
use common::{
851+
async_zip_ext::ZipEntryReaderExt,
860852
document::{
861853
ParsedDocument,
862854
ResolvedDocument,
@@ -1048,16 +1040,20 @@ mod tests {
10481040
.await?
10491041
.context("object missing from storage")?;
10501042
let stored_bytes = storage_stream.collect_as_bytes().await?;
1051-
let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?;
1043+
let zip_reader =
1044+
async_zip::base::read::mem::ZipFileReader::new(stored_bytes.into()).await?;
10521045
let mut zip_entries = BTreeMap::new();
10531046
let filenames: Vec<_> = zip_reader
1047+
.file()
10541048
.entries()
1055-
.into_iter()
1056-
.map(|entry| entry.filename().to_string())
1049+
.iter()
1050+
.map(|entry| entry.filename().as_str().unwrap().to_string())
10571051
.collect();
10581052
for (i, filename) in filenames.into_iter().enumerate() {
1059-
let entry_reader = zip_reader.entry_reader(i).await?;
1060-
let entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?;
1053+
let mut entry_reader = zip_reader.reader_with_entry(i).await?;
1054+
let entry_contents = entry_reader
1055+
.read_to_string_checked_bypass_async_zip_crc_bug()
1056+
.await?;
10611057
zip_entries.insert(filename, entry_contents);
10621058
}
10631059
assert_eq!(zip_entries, expected_export_entries);
@@ -1141,16 +1137,20 @@ mod tests {
11411137
.await?
11421138
.context("object missing from storage")?;
11431139
let stored_bytes = storage_stream.collect_as_bytes().await?;
1144-
let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?;
1140+
let zip_reader =
1141+
async_zip::base::read::mem::ZipFileReader::new(stored_bytes.into()).await?;
11451142
let mut zip_entries = BTreeMap::new();
11461143
let filenames: Vec<_> = zip_reader
1144+
.file()
11471145
.entries()
1148-
.into_iter()
1149-
.map(|entry| entry.filename().to_string())
1146+
.iter()
1147+
.map(|entry| entry.filename().as_str().unwrap().to_string())
11501148
.collect();
11511149
for (i, filename) in filenames.into_iter().enumerate() {
1152-
let entry_reader = zip_reader.entry_reader(i).await?;
1153-
let entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?;
1150+
let mut entry_reader = zip_reader.reader_with_entry(i).await?;
1151+
let entry_contents = entry_reader
1152+
.read_to_string_checked_bypass_async_zip_crc_bug()
1153+
.await?;
11541154
zip_entries.insert(filename, entry_contents);
11551155
}
11561156
assert_eq!(zip_entries, expected_export_entries);

0 commit comments

Comments
 (0)