How to connect to data on a filesystem using Spark
This guide will help you connect to your data stored on a filesystem using Spark. This will allow you to validate and explore your data.
Prerequisites: This how-to guide assumes you have:
- Completed the Getting Started Tutorial
- Have a working installation of Great Expectations
- Have access to a working Spark installation
- Have access to data on a filesystem
#
Steps#
1. Choose how to run the code in this guideGet an environment to run the code in this guide. Please choose an option below.
- CLI + filesystem
- No CLI + filesystem
- No CLI + no filesystem
If you use the Great Expectations CLI, run this command to automatically generate a pre-configured Jupyter Notebook. Then you can follow along in the YAML-based workflow below:
great_expectations datasource new
If you use Great Expectations in an environment that has filesystem access, and prefer not to use the CLI, run the code in this guide in a notebook or other Python script.
If you use Great Expectations in an environment that has no filesystem (such as Databricks or AWS EMR), run the code in this guide in that system's preferred way.
#
2. π‘ Instantiate your project's DataContextImport these necessary packages and modules.
from ruamel import yaml
import great_expectations as gefrom great_expectations.core.batch import BatchRequest, RuntimeBatchRequest
Load your DataContext into memory
Use one of the guides below based on your deployment:
Please proceed only after you have instantiated your DataContext
.
#
3. Configure your DatasourceUsing this example configuration, add in your path to a directory that contains some of your data:
- YAML
- Python
datasource_yaml = fr"""name: my_filesystem_datasourceclass_name: Datasourceexecution_engine: class_name: SparkDFExecutionEnginedata_connectors: default_runtime_data_connector_name: class_name: RuntimeDataConnector batch_identifiers: - default_identifier_name default_inferred_data_connector_name: class_name: InferredAssetFilesystemDataConnector base_directory: <YOUR_PATH> default_regex: group_names: - data_asset_name pattern: (.*)\.csv"""
Run this code to test your configuration.
context.test_yaml_config(datasource_yaml)
datasource_config = { "name": "my_filesystem_datasource", "class_name": "Datasource", "execution_engine": {"class_name": "SparkDFExecutionEngine"}, "data_connectors": { "default_runtime_data_connector_name": { "class_name": "RuntimeDataConnector", "batch_identifiers": ["default_identifier_name"], }, "default_inferred_data_connector_name": { "class_name": "InferredAssetFilesystemDataConnector", "base_directory": "<YOUR_PATH>", "default_regex": { "group_names": ["data_asset_name"], "pattern": "(.*)\\.csv", }, }, },}
Run this code to test your configuration.
context.test_yaml_config(yaml.dump(datasource_config))
If you specified a path containing CSV files you will see them listed as Available data_asset_names
in the output of test_yaml_config()
.
Feel free to adjust your configuration and re-run test_yaml_config()
as needed.
#
4. Save the Datasource configuration to your DataContextSave the configuration into your DataContext
by using the add_datasource()
function.
- YAML
- Python
context.add_datasource(**yaml.load(datasource_yaml))
context.add_datasource(**datasource_config)
#
5. Test your new DatasourceVerify your new Datasource by loading data from it into a Validator
using a BatchRequest
.
- Specify a path to single CSV
- Specify a data_asset_name
Add the path to your CSV in the path
key under runtime_parameters
in your BatchRequest
.
batch_request = RuntimeBatchRequest( datasource_name="my_filesystem_datasource", data_connector_name="default_runtime_data_connector_name", data_asset_name="<YOUR_MEANGINGFUL_NAME>", # this can be anything that identifies this data_asset for you runtime_parameters={"path": "<PATH_TO_YOUR_DATA_HERE>"}, # Add your path here. batch_identifiers={"default_identifier_name": "default_identifier"},)
Then load data into the Validator
.
context.create_expectation_suite( expectation_suite_name="test_suite", overwrite_existing=True)validator = context.get_validator( batch_request=batch_request, expectation_suite_name="test_suite")print(validator.head())
Add the name of the data asset to the data_asset_name
in your BatchRequest
.
batch_request = BatchRequest( datasource_name="my_filesystem_datasource", data_connector_name="default_inferred_data_connector_name", data_asset_name="<YOUR_DATA_ASSET_NAME>",)
Then load data into the Validator
.
context.create_expectation_suite( expectation_suite_name="test_suite", overwrite_existing=True)validator = context.get_validator( batch_request=batch_request, expectation_suite_name="test_suite")print(validator.head())
ππ Congratulations! ππ You successfully connected Great Expectations with your data.
#
Additional Notes#
How to read-in multiple CSVs as a single Spark DataframeMore advanced configuration for reading in CSV files through the SparkDFExecutionEngine
is possible through the batch_spec_passthrough
parameter. batch_spec_passthrough
allows for reader-methods to be directly specified,
and backend-specific reader_options
to be passed through to the actual reader-method, in this case spark.read.csv()
. The following example shows how batch_spec_passthrough
parameters can be added to the BatchRequest
. However,
the same parameters can be added to the Datasource configuration at the DataConnector level.
If you have a directory with 3 CSV files with each file having 10,000 lines each:
taxi_data_files/yellow_tripdata_sample_2019-1.csv taxi_data_files/yellow_tripdata_sample_2019-2.csv taxi_data_files/yellow_tripdata_sample_2019-3.csv
You could write a BatchRequest
that reads in the entire folder as a single Spark Dataframe by specifying the reader_method
to be csv
, header
to be set to True
in the reader_options
.
batch_request = RuntimeBatchRequest( datasource_name="my_filesystem_datasource", data_connector_name="default_runtime_data_connector_name", data_asset_name="example_data_asset", runtime_parameters={"path": "taxi_data_files"}, batch_identifiers={"default_identifier_name": 1234567890}, batch_spec_passthrough={"reader_method": "csv", "reader_options": {"header": True}},)
Once that step is complete, then we can confirm that our Validator
contains a batch with the expected 30,000 lines.
context.create_expectation_suite( expectation_suite_name="test_suite", overwrite_existing=True)validator = context.get_validator( batch_request=batch_request, expectation_suite_name="test_suite")
print(validator.head())print(validator.active_batch.data.dataframe.count()) # should be 30,000
If you are working with nonstandard CSVs, read one of these guides:
- How to work with headerless CSVs in Spark
- How to work with custom delimited CSVs in Spark
- How to work with parquet files in Spark
To view the full scripts used in this page, see them on GitHub:
#
Next StepsNow that you've connected to your data, you'll want to work on these core skills: