Skip to content

Add support for glob string in datafusion-cli query #16332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ datafusion = { workspace = true, features = [
dirs = "6.0.0"
env_logger = { workspace = true }
futures = { workspace = true }
glob = "0.3.0"
mimalloc = { version = "0.1", default-features = false }
object_store = { workspace = true, features = ["aws", "gcp", "http"] }
parking_lot = { workspace = true }
Expand Down
104 changes: 103 additions & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use datafusion::catalog::{Session, TableFunctionImpl};
use datafusion::common::{plan_err, Column};
use datafusion::common::{plan_datafusion_err, plan_err, Column};
use datafusion::datasource::file_format::{
csv::CsvFormat, json::JsonFormat as NdJsonFormat, parquet::ParquetFormat, FileFormat,
};
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
Expand All @@ -42,6 +48,13 @@ use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics;

use datafusion::prelude::SessionContext;

use futures::executor::block_on;

use glob::Pattern;
use url::Url;

#[derive(Debug)]
pub enum Function {
Select,
Expand Down Expand Up @@ -460,3 +473,92 @@ impl TableFunctionImpl for ParquetMetadataFunc {
Ok(Arc::new(parquet_metadata))
}
}

/// A table function that allows users to query files using glob patterns
/// for example: SELECT * FROM glob('path/to/*/file.parquet')
pub struct GlobFunc {
// we need the ctx here to get the schema from the listing table later
ctx: SessionContext,
}

impl std::fmt::Debug for GlobFunc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GlobFunc")
.field("ctx", &"<SessionContext>")
.finish()
}
}

impl GlobFunc {
/// Create a new GlobFunc
pub fn new(ctx: SessionContext) -> Self {
Self { ctx }
}
}

fn as_utf8_literal<'a>(expr: &'a Expr, arg_name: &str) -> Result<&'a str> {
match expr {
Expr::Literal(ScalarValue::Utf8(Some(s)), _) => Ok(s),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Maybe thus could use he try_as_str function (which would also handle other literal types) https://docs.rs/datafusion/latest/datafusion/scalar/enum.ScalarValue.html#method.try_as_str

Expr::Column(Column { name, .. }) => Ok(name),
_ => plan_err!("glob() requires a string literal for the '{arg_name}' argument"),
}
}

impl TableFunctionImpl for GlobFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
// Parse arguments
if exprs.is_empty() {
return plan_err!("glob() requires a glob pattern");
}
let pattern = as_utf8_literal(&exprs[0], "pattern")?;
let format = if exprs.len() > 1 {
Some(as_utf8_literal(&exprs[1], "format")?)
} else {
None
};

// Create ListingTableUrl - distinguish between URLs with schemes and local paths
let url = if pattern.contains("://") && pattern.contains(['*', '?', '[']) {
// URL with scheme and glob - split manually to avoid URL encoding of glob chars
let glob_pos = pattern.find(['*', '?', '[']).unwrap(); // we already checked it exists
let split_pos = pattern[..glob_pos].rfind('/').unwrap() + 1; // find last '/' before glob
let (base_path, glob_part) = pattern.split_at(split_pos);

let base_url =
Url::parse(&format!("{}/", base_path.trim_end_matches('/')))
.map_err(|e| plan_datafusion_err!("Invalid base URL: {}", e))?;
let glob = Pattern::new(glob_part)
.map_err(|e| plan_datafusion_err!("Invalid glob pattern: {}", e))?;
ListingTableUrl::try_new(base_url, Some(glob))?
} else {
// Local path or URL without globs - parse() handles this correctly
ListingTableUrl::parse(pattern)?
};

// Determine file format for the table configuration
let file_extension = format
.or_else(|| {
// Extract extension from original pattern (before any URL manipulation)
pattern.split('/').next_back()?.split('.').next_back()
})
.unwrap_or("parquet");

let file_format: Arc<dyn FileFormat> = match file_extension {
"parquet" => Arc::new(ParquetFormat::default()),
"csv" => Arc::new(CsvFormat::default().with_has_header(true)),
"json" => Arc::new(NdJsonFormat::default()),
other => return plan_err!("glob(): unsupported format '{other}'"),
};

// Create the listing table - block is needed in order to infer schema which is an async io operation within a sync function.
let listing_opts =
ListingOptions::new(file_format).with_file_extension(file_extension);
let schema = block_on(listing_opts.infer_schema(&self.ctx.state(), &url))?;
let config = ListingTableConfig::new(url)
.with_listing_options(listing_opts)
.with_schema(schema);
let table = ListingTable::try_new(config)?;

Ok(Arc::new(table))
}
}
15 changes: 13 additions & 2 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::execution::memory_pool::{
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
use datafusion_cli::functions::{GlobFunc, ParquetMetadataFunc};
use datafusion_cli::{
exec,
pool_type::PoolType,
Expand Down Expand Up @@ -217,7 +217,7 @@ async fn main_inner() -> Result<()> {
ctx.state_weak_ref(),
)));
// register `parquet_metadata` table function to get metadata from parquet files
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));
register_builtin_functions(&ctx)?;

let mut print_options = PrintOptions {
format: args.format,
Expand Down Expand Up @@ -394,6 +394,17 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
parse_size_string(size, "disk limit")
}

fn register_builtin_functions(ctx: &SessionContext) -> Result<()> {
// register parquet_metadata
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));

// register glob
let glob_func = Arc::new(GlobFunc::new(ctx.clone()));
ctx.register_udtf("glob", glob_func);

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
57 changes: 57 additions & 0 deletions datafusion-cli/tests/snapshots/cli@glob_test.sql.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
---
source: datafusion-cli/tests/cli_integration.rs
assertion_line: 173
info:
program: datafusion-cli
args: []
stdin: "-- Test glob function with files available in CI\n-- Test 1: Single CSV file - verify basic functionality\nSELECT COUNT(*) AS cars_count FROM glob('../datafusion/core/tests/data/cars.csv');\n\n-- Test 2: Data aggregation from CSV file - verify actual data reading\nSELECT car, COUNT(*) as count FROM glob('../datafusion/core/tests/data/cars.csv') GROUP BY car ORDER BY car;\n\n-- Test 3: JSON file with explicit format parameter - verify format specification\nSELECT COUNT(*) AS json_count FROM glob('../datafusion/core/tests/data/1.json', 'json');\n\n-- Test 4: Single specific CSV file - verify another CSV works\nSELECT COUNT(*) AS example_count FROM glob('../datafusion/core/tests/data/example.csv');\n\n-- Test 5: Glob pattern with wildcard - test actual glob functionality\nSELECT COUNT(*) AS glob_pattern_count FROM glob('../datafusion/core/tests/data/exa*.csv'); "
input_file: datafusion-cli/tests/sql/integration/glob_test.sql
---
success: true
exit_code: 0
----- stdout -----
[CLI_VERSION]
+------------+
| cars_count |
+------------+
| 25 |
+------------+
1 row(s) fetched.
[ELAPSED]

+-------+-------+
| car | count |
+-------+-------+
| green | 12 |
| red | 13 |
+-------+-------+
2 row(s) fetched.
[ELAPSED]

+------------+
| json_count |
+------------+
| 4 |
+------------+
1 row(s) fetched.
[ELAPSED]

+---------------+
| example_count |
+---------------+
| 1 |
+---------------+
1 row(s) fetched.
[ELAPSED]

+--------------------+
| glob_pattern_count |
+--------------------+
| 4 |
+--------------------+
1 row(s) fetched.
[ELAPSED]

\q

----- stderr -----
15 changes: 15 additions & 0 deletions datafusion-cli/tests/sql/integration/glob_test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Test glob function with files available in CI
-- Test 1: Single CSV file - verify basic functionality
SELECT COUNT(*) AS cars_count FROM glob('../datafusion/core/tests/data/cars.csv');

-- Test 2: Data aggregation from CSV file - verify actual data reading
SELECT car, COUNT(*) as count FROM glob('../datafusion/core/tests/data/cars.csv') GROUP BY car ORDER BY car;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another usecase that @robtandy had was "a list of multiple files" -- like is there some way to select exactly two files? Something like

glob('[../datafusion/core/tests/data/cars.csv',  '../datafusion/core/tests/data/trucks.csv', ])

Perhaps 🤔


-- Test 3: JSON file with explicit format parameter - verify format specification
SELECT COUNT(*) AS json_count FROM glob('../datafusion/core/tests/data/1.json', 'json');

-- Test 4: Single specific CSV file - verify another CSV works
SELECT COUNT(*) AS example_count FROM glob('../datafusion/core/tests/data/example.csv');

-- Test 5: Glob pattern with wildcard - test actual glob functionality
SELECT COUNT(*) AS glob_pattern_count FROM glob('../datafusion/core/tests/data/exa*.csv');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we introduce a new function? can we reuse current model?

what should be the behavior if there are mixed CSV/JSON/Parquet files in the folder?

Copy link
Author

@a-agmon a-agmon Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the current model, but I think it will require touching some core modules within datafusion.

re the second question, I think that supporting multiple file types should not be supported.

Copy link
Author

@a-agmon a-agmon Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, the current implementation supports the following - but just for local files (as it uses ::parse())

CREATE EXTERNAL TABLE logs 
STORED AS CSV 
LOCATION '/data/*_small.csv';

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possibility would be to intercept the CREATE EXTERNAL TABLE command in datafusion-cli itself

For example, simliarly to how it peeks here:

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
// To support custom formats, treat error as None
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
ctx,
&cmd.location,
&cmd.options,
format,
)
.await?;
}

We could implement a special handler in datafusion-cli rather than use the default one in SessionContext:

DdlStatement::CreateExternalTable(cmd) => {
(Box::pin(async move { self.create_external_table(&cmd).await })
as std::pin::Pin<Box<dyn futures::Future<Output = _> + Send>>)
.await