Beam SQL DDL Overview

Beam SQL provides a standard three-level hierarchy to manage metadata across external data sources, enabling structured discovery and cross-source interoperability.

  1. Catalog: The top-level container representing an external metadata provider. Examples include a Hive Metastore, AWS Glue, or a BigLake Catalog.
  2. Database: A logical grouping within a Catalog. This typically maps to a “Schema” in traditional RDBMS or a “Namespace” in systems like Apache Iceberg
  3. Table: The leaf node containing the schema definition and the underlying data.

This structure enables Federated Querying. Because Beam can resolve multiple Catalogs simultaneously, you can execute complex pipelines that bridge disparate environments within a single SQL statement (e.g. joining a production BigQuery table with a developmental Iceberg dataset in GCS).

By using fully qualified names (e.g., catalog.database.table), you can perform cross-catalog joins or migrate data between clouds without manual schema mapping or intermediate storage.

Click below to learn about metadata management at each level:

Catalogs

The Catalog is the entry point for external metadata. When you initialize Beam SQL, you start off with a default Catalog that contains a default Database. You can register new Catalogs, switch between them, and modify their configurations.

Registers a new Catalog instance

Note: Creating a Catalog does not automatically switch to it. Remember to run USE CATALOG afterwards to set it.

CREATE CATALOG [ IF NOT EXISTS ] catalog_name
TYPE 'type_name'
[ PROPERTIES ( 'key' = 'value' [, ...] ) ]

Example: Creating a Hadoop Catalog (Local Storage)

CREATE CATALOG local_catalog
TYPE iceberg
PROPERTIES (
  'type'      = 'hadoop',
  'warehouse' = 'file:///tmp/iceberg-warehouse'
)

Example: Registering a BigLake Catalog (GCS)

CREATE CATALOG prod_iceberg
TYPE iceberg
PROPERTIES (
  'type'           = 'rest',
  'uri'            = 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
  'warehouse'      = 'gs://my-company-bucket/warehouse',
  'header.x-goog-user-project' = 'my_prod_project',
  'rest.auth.type' = 'google',
  'io-impl'        = 'org.apache.iceberg.gcp.gcs.GCSFileIO',
  'header.X-Iceberg-Access-Delegation' = 'vended-credentials'
);

Sets the active Catalog for the current session. This simplifies queries by allowing you to reference Databases directly without their fully-qualified names (e.g. my_db instead of my_catalog.my_db)

Tip: run SHOW CURRENT CATALOG to view the currently active Catalog.

Note: All subsequent DATABASE and TABLE commands will be executed under this Catalog, unless fully qualified.

USE CATALOG prod_iceberg;

Can be used to either:

  1. List Catalogs registered in this Beam SQL session.
  2. View the currently active Catalog.
SHOW CATALOGS [ LIKE regex_pattern ]

Example: List all Catalogs

SHOW CATALOGS;

Example: List Catalogs matching a pattern

SHOW CATALOGS LIKE 'prod_%';

Example: Verify which Catalog is currently active

SHOW CURRENT CATALOG;

Unregisters a Catalog from the current Beam SQL session. This does not destroy external data.

DROP CATALOG [ IF EXISTS ] catalog_name;
Modifies the properties of a registered Catalog.
ALTER CATALOG catalog_name
  [ SET ( 'key' = 'val', ... ) ]
  [ RESET ( 'key', ... ) ]
  1. SET: Adds new properties or updates existing ones.
  2. RESET / UNSET: Removes properties.

Databases

A Database lives inside a Catalog and may contain a number of Tables.

Creates a new Database within the current Catalog (default), or the specified Catalog.

Note: Creating a Database does not automatically switch to it. Remember to run USE DATABASE afterwards to set it.

CREATE DATABASE [ IF NOT EXISTS ] [ catalog_name. ]database_name;

Example: Create a Database in the current active Catalog

USE CATALOG my_catalog;
CREATE DATABASE sales_data;

Example: Create a Database in a specified registered Catalog

CREATE DATABASE other_catalog.sales_data;

Sets the active Database for the current session. This simplifies queries by allowing you to reference Databases directly without their fully-qualified names (e.g. my_db instead of my_catalog.my_db)

Note: All subsequent TABLE commands will be executed under this Database, unless fully qualified.

USE DATABASE sales_data;

Switch to a Database in a specified Catalog. Node: this also switches the default to that Catalog

USE DATABASE other_catalog.sales_data;

Can be used to either:

  1. List Databases within the currently active Catalog, or a specified Catalog.
  2. View the currently active Database.
SHOW DATABASES [ ( FROM | IN )? catalog_name ] [LIKE regex_pattern ]

Example: List Databases in the currently active Catalog

SHOW DATABASES;

Example: List Databases in a specified Catalog

SHOW DATABASES IN my_catalog;

Example: List Databases matching a pattern

SHOW DATABASES IN my_catalog LIKE '%geo%';

Example: Verify which Database is currently active

SHOW CURRENT DATABASE;

Unregisters a Database from the current session. For some connectors, this will also delete the Database from the external data source.

DROP DATABASE [ IF EXISTS ] database_name [ RESTRICT | CASCADE ];
  1. RESTRICT: (Default): Fails if the Database is not empty.
  2. CASCADE: Drops the Database and all tables contained within it. Use with caution.

Tables

The actual entity containing data, and is described by a schema. Some data sources also support applying a partition spec and attaching table-specific properties.

Creates a new Table within the current Catalog and Database (default), or the specified Catalog and Database.

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] [ catalog. ][ db. ]table_name (
col_name col_type [ NOT NULL ] [ COMMENT 'col_comment' ],
...
)
TYPE 'type_name'
[ PARTITIONED BY ( 'partition_field' [, ... ] ) ]
[ COMMENT 'table_comment' ]
[ LOCATION 'location_uri' ]
[ TBLPROPERTIES 'properties_json_string' ];
  1. TYPE: the table type (e.g. 'iceberg', 'text', 'kafka').
  2. PARTITIONED BY: an ordered list of fields describing the partition spec.
  3. LOCATION: explicitly sets the location of the table (overriding the inferred catalog.db.table_name location)
  4. TBLPROPERTIES: configuration properties used when creating the table or setting up its IO connection.

Example: Creating an Iceberg Table

CREATE EXTERNAL TABLE prod_iceberg.sales_data.orders (
    order_id BIGINT NOT NULL COMMENT 'Unique order identifier',
    amount DECIMAL(10, 2),
    order_date TIMESTAMP,
    region_id VARCHAR
)
TYPE 'iceberg'
PARTITIONED BY ( 'region_id', 'day(order_date)' )
COMMENT 'Daily sales transactions'
TBLPROPERTIES '{
    "write.format.default": "parquet",
    "read.split.target-size": 268435456",
    "beam.write.triggering_frequency_seconds": 60"
}';
  1. This creates an Iceberg table named orders under the namespace sales_data, within the prod_iceberg catalog.
  2. The table is partitioned by region_id, then by the day value of order_date (using Iceberg's hidden partitioning).
  3. The table is created with the appropriate properties "write.format.default" and "read.split.target-size". The Beam property "beam.write.triggering_frequency_seconds"
  4. Beam properties (prefixed with "beam.write." and "beam.read." are intended for the relevant IOs)
Modifies an existing Table's properties and evolves its partition and schema.
ALTER TABLE table_name
    [ ADD COLUMNS ( col_def, ... ) ]
    [ DROP COLUMNS ( col_name, ... ) ]
    [ ADD PARTITIONS ( partition_field, ... ) ]
    [ DROP PARTITIONS ( partition_field, ... ) ]
    [ SET ( 'key' = 'val', ... ) ]
    [ ( RESET | UNSET ) ( 'key', ... ) ];

Example: Add or remove columns

-- Add columns
ALTER TABLE orders ADD COLUMNS (
    customer_email VARCHAR,
    shipping_region VARCHAR
);

-- Drop columns
ALTER TABLE orders DROP COLUMNS ( customer_email );

Example: Modify partition spec

-- Add a partition field
ALTER TABLE orders ADD PARTITIONS ( 'year(order_date)' );

-- Remove a partition field
ALTER TABLE orders DROP PARTITIONS ( 'region_id' );

Example: Modify table properties

ALTER TABLE orders SET (
    'write.format.default' = 'orc',
    'write.metadata.metrics.default' = 'full' );
ALTER TABLE orders RESET ( 'write.target-file-size-bytes' );

Lists tables under the currently active database, or a specified database.

SHOW TABLES [ ( FROM | IN )? [ catalog_name '.' ] database_name ] [ LIKE regex_pattern ]

Example: List tables in the currently active database and catalog

SHOW TABLES;

Example: List tables in a specified database

SHOW TABLES IN my_db;
SHOW TABLES IN my_catalog.my_db;

Example: List tables matching a pattern

SHOW TABLES IN my_db LIKE '%orders%';

Unregisters a table from the current session. For supported connectors, this will also delete the table from the external data source.

DROP TABLE [ IF EXISTS ] table_name;