Apache Iceberg I/O connector
The Beam SDKs include built-in transforms that can read data from and write data to Apache Iceberg tables.
- SQL Shell
- Java SDK
- Python SDK
- Yaml API
To use IcebergIO, add the Maven artifact dependency to your pom.xml file.
To use IcebergIO, install the Beam SQL Shell and run the following command:
./beam-sql.sh --io
Additional resources:
Iceberg basics
Catalogs
A catalog is a top-level entity used to manage and access Iceberg tables. There are many catalog implementations out there; this guide focuses on the Hadoop catalog for easy local testing and BigLake REST catalog for cloud-scale development.
Namespaces
A namespace lives inside a catalog and may contain a number of Iceberg tables. This is the equivalent of a “database”.
Tables
The actual entity containing data, and is described by a schema and partition spec.
Snapshots
A new snapshot is created whenever a change is made to an Iceberg table. Each snapshot provides a summary of the change and references its parent snapshot. An Iceberg table’s history is a chronological list of snapshots, enabling features like time travel and ACID-compliant concurrent writes.
Quickstart Guide
Choose Your Catalog
First, select a Catalog implementation to handle metadata management and storage interaction. Beam supports a wide variety of Iceberg catalogs, but this guide focuses on two common paths: Hadoop for easy local development and BigLake for managing production data at cloud scale.
Use Hadoop Catalog for quick, local testing with zero setup and no external dependencies. The following examples use a temporary local directory.
Use BigLake Catalog for a fully managed REST-based experience. It simplifies access to cloud storage with built-in credential delegation and unified metadata management. It requires a few pre-requisites:
- A Google Cloud Project (for authentication). Create an account here if you don’t have one.
- Standard Google Application Default Credentials (ADC) set up in your environment.
- A Google Cloud Storage bucket
CREATE CATALOG my_catalog TYPE 'iceberg'
PROPERTIES (
'type' = 'rest',
'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
'warehouse' = 'gs://$BUCKET_NAME',
'header.x-goog-user-project' = '$PROJECT_ID',
'rest.auth.type' = 'google',
'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO',
'header.X-Iceberg-Access-Delegation' = 'vended-credentials'
);
Map<String, String> catalogProps =
ImmutableMap.of(
"type", "rest",
"uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
"warehouse", "gs://biglake-public-nyc-taxi-iceberg",
"header.x-goog-user-project", "$PROJECT_ID",
"rest.auth.type", "google",
"io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
"header.X-Iceberg-Access-Delegation", "vended-credentials");
biglake_catalog_config = {
'type': 'rest',
'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
'warehouse': 'gs://biglake-public-nyc-taxi-iceberg',
'header.x-goog-user-project': '$PROJECT_ID',
'rest.auth.type': 'google',
'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO',
'header.X-Iceberg-Access-Delegation': 'vended-credentials'
}
catalog_props: &catalog_props
type: "rest"
uri: "https://biglake.googleapis.com/iceberg/v1/restcatalog"
warehouse: "gs://$BUCKET_NAME"
header.x-goog-user-project: "$PROJECT_ID"
rest.auth.type: "google"
io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
header.X-Iceberg-Access-Delegation: "vended-credentials"
Create a Namespace
You can use Beam SQL to create a new namespace through an explicit DDL statement:
CREATE DATABASE my_catalog.my_db;
Alternatively, the IcebergIO sink can handle namespace creation automatically at runtime. This is ideal for dynamic pipelines where destinations are determined by the incoming data
Create a Table
Tables are defined by a schema and an optional partition spec. You can create a table using SQL DDL or by configuring the Iceberg destination in your Beam pipeline.
Insert Data
Once your table is defined, you can write data using standard SQL INSERT or by calling the IcebergIO sink in your SDK of choice.
Schema inputSchema =
Schema.builder().addInt64Field("id").addStringField("name").addInt32Field("age").build();
Pipeline p = Pipeline.create();
p.apply(
Create.of(
Row.withSchema(inputSchema).addValues(1, "Mark", 34).build(),
Row.withSchema(inputSchema).addValues(2, "Omar", 24).build(),
Row.withSchema(inputSchema).addValues(3, "Rachel", 27).build()))
.apply(Managed.write("iceberg").withConfig(managedConfig));
p.run();View Namespaces and Tables
You can use Beam SQL to view the newly created resources:
SHOW DATABASES my_catalog;
SHOW TABLES my_catalog.my_db;
Query Data
Data Types
Check this overview of Iceberg data types.
IcebergIO leverages Beam Schemas to bridge the gap between SDK-native types and the Iceberg specification. While the Java SDK provides full coverage for the Iceberg v2 spec (with v3 support currently in development), other SDKs may have specific constraints on complex or experimental types. The following examples demonstrate the standard mapping for core data types across SQL, Java, Python, and YAML:
INSERT INTO catalog.namespace.table VALUES (
9223372036854775807, -- BIGINT
2147483647, -- INTEGER
1.0, -- FLOAT
1.0, -- DOUBLE
TRUE, -- BOOLEAN
TIMESTAMP '2018-05-28 20:17:40.123', -- TIMESTAMP
'varchar', -- VARCHAR
'char', -- CHAR
ARRAY['abc', 'xyz'], -- ARRAY
ARRAY[CAST(ROW('abc', 123) AS ROW(nested_str VARCHAR, nested_int INTEGER))] -- ARRAY[STRUCT]
)import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
public class IcebergBeamSchemaAndRow {
Schema nestedSchema =
Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build();
Schema beamSchema =
Schema.builder()
.addBooleanField("boolean_field")
.addInt32Field("int_field")
.addInt64Field("long_field")
.addFloatField("float_field")
.addDoubleField("double_field")
.addDecimalField("numeric_field")
.addByteArrayField("bytes_field")
.addStringField("string_field")
.addLogicalTypeField("time_field", SqlTypes.TIME)
.addLogicalTypeField("date_field", SqlTypes.DATE)
.addLogicalTypeField("timestamp_field", Timestamp.MICROS)
.addDateTimeField("timestamptz_field")
.addArrayField("array_field", Schema.FieldType.INT32)
.addMapField("map_field", Schema.FieldType.STRING, Schema.FieldType.INT32)
.addRowField("struct_field", nestedSchema)
.build();
Row beamRow =
Row.withSchema(beamSchema)
.withFieldValues(
ImmutableMap.<String, Object>builder()
.put("boolean_field", true)
.put("int_field", 1)
.put("long_field", 2L)
.put("float_field", 3.4f)
.put("double_field", 4.5d)
.put("numeric_field", new BigDecimal(67))
.put("bytes_field", new byte[] {1, 2, 3})
.put("string_field", "value")
.put("time_field", LocalTime.now())
.put("date_field", LocalDate.now())
.put("timestamp_field", Instant.now())
.put("timestamptz_field", DateTime.now())
.put("array_field", Arrays.asList(1, 2, 3))
.put("map_field", ImmutableMap.of("a", 1, "b", 2))
.put(
"struct_field",
Row.withSchema(nestedSchema).addValues("nested_value", 123).build())
.build())
.build();
}from decimal import Decimal
import apache_beam as beam
from apache_beam.utils.timestamp import Timestamp
row = beam.Row(
boolean_field=True,
int_field=1,
float_field=2.3,
numeric_field=Decimal('34'),
bytes_field=b'value',
string_field="value",
timestamptz_field=Timestamp(4, 5),
array_field=[1, 2, 3],
map_field={
"a": 1, "b": 2
},
struct_field=beam.Row(nested_field="nested_value", nested_field2=123))pipeline:
transforms:
- type: Create
config:
elements:
- boolean_field: false
integer_field: 123
number_field: 4.56
string_field: "abc"
struct_field:
nested_1: a
nested_2: 1
array_field: [1, 2, 3]
output_schema:
type: object
properties:
boolean_field:
type: boolean
integer_field:
type: integer
number_field:
type: number
string_field:
type: string
struct_field:
type: object
properties:
nested_1:
type: string
nested_2:
type: integer
array_field:
type: array
items:
type: integerFurther steps
Check out the full IcebergIO configuration to make use of other features like applying a partition spec, table properties, row filtering, column pruning, etc.
Last updated on 2026/03/03
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!

