Built-in I/O Transforms

Apache Iceberg I/O connector

The Beam SDKs include built-in transforms that can read data from and write data to Apache Iceberg tables.

To use IcebergIO, add the Maven artifact dependency to your pom.xml file.

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>2.71.0</version>
</dependency>

To use IcebergIO, install the Beam SQL Shell and run the following command:

./beam-sql.sh --io

Additional resources:

Iceberg basics

Catalogs

A catalog is a top-level entity used to manage and access Iceberg tables. There are many catalog implementations out there; this guide focuses on the Hadoop catalog for easy local testing and BigLake REST catalog for cloud-scale development.

Namespaces

A namespace lives inside a catalog and may contain a number of Iceberg tables. This is the equivalent of a “database”.

Tables

The actual entity containing data, and is described by a schema and partition spec.

Snapshots

A new snapshot is created whenever a change is made to an Iceberg table. Each snapshot provides a summary of the change and references its parent snapshot. An Iceberg table’s history is a chronological list of snapshots, enabling features like time travel and ACID-compliant concurrent writes.

Quickstart Guide

Choose Your Catalog

First, select a Catalog implementation to handle metadata management and storage interaction. Beam supports a wide variety of Iceberg catalogs, but this guide focuses on two common paths: Hadoop for easy local development and BigLake for managing production data at cloud scale.

Use Hadoop Catalog for quick, local testing with zero setup and no external dependencies. The following examples use a temporary local directory.


    CREATE CATALOG my_catalog TYPE 'iceberg'
    PROPERTIES (
      'type' = 'hadoop',
      'warehouse' = 'file://tmp/beam-iceberg-local-quickstart',
    );
  
    
    Map<String, String> catalogProps =
        ImmutableMap.of(
            "type", "hadoop",
            "warehouse", "file://tmp/beam-iceberg-local-quickstart");

  
    
    hadoop_catalog_props = {
        'type': 'hadoop', 'warehouse': 'file://tmp/beam-iceberg-local-quickstart'
    }

  

Use BigLake Catalog for a fully managed REST-based experience. It simplifies access to cloud storage with built-in credential delegation and unified metadata management. It requires a few pre-requisites:

  CREATE CATALOG my_catalog TYPE 'iceberg'
  PROPERTIES (
    'type' = 'rest',
    'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
    'warehouse' = 'gs://$BUCKET_NAME',
    'header.x-goog-user-project' = '$PROJECT_ID',
    'rest.auth.type' = 'google',
    'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO',
    'header.X-Iceberg-Access-Delegation' = 'vended-credentials'
  );
  
  
  Map<String, String> catalogProps =
      ImmutableMap.of(
          "type", "rest",
          "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
          "warehouse", "gs://biglake-public-nyc-taxi-iceberg",
          "header.x-goog-user-project", "$PROJECT_ID",
          "rest.auth.type", "google",
          "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
          "header.X-Iceberg-Access-Delegation", "vended-credentials");

  
  
  biglake_catalog_config = {
      'type': 'rest',
      'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
      'warehouse': 'gs://biglake-public-nyc-taxi-iceberg',
      'header.x-goog-user-project': '$PROJECT_ID',
      'rest.auth.type': 'google',
      'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO',
      'header.X-Iceberg-Access-Delegation': 'vended-credentials'
  }

  
  catalog_props: &catalog_props
    type: "rest"
    uri: "https://biglake.googleapis.com/iceberg/v1/restcatalog"
    warehouse: "gs://$BUCKET_NAME"
    header.x-goog-user-project: "$PROJECT_ID"
    rest.auth.type: "google"
    io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
    header.X-Iceberg-Access-Delegation: "vended-credentials"
  

Create a Namespace

You can use Beam SQL to create a new namespace through an explicit DDL statement:

CREATE DATABASE my_catalog.my_db;

Alternatively, the IcebergIO sink can handle namespace creation automatically at runtime. This is ideal for dynamic pipelines where destinations are determined by the incoming data

Create a Table

Tables are defined by a schema and an optional partition spec. You can create a table using SQL DDL or by configuring the Iceberg destination in your Beam pipeline.

CREATE EXTERNAL TABLE my_catalog.my_db.my_table (
    id BIGINT,
    name VARCHAR,
    age INTEGER
)
TYPE 'iceberg'
Map<String, Object> managedConfig =
    ImmutableMap.of("table", "my_db.my_table", "catalog_properties", catalogProps);
// Note: The table will get created when inserting data (see below)
managed_config = {
    'table': 'my_db.my_table', 'catalog_properties': hadoop_catalog_props
}
# Note: The table will get created when inserting data (see below)
- type: WriteToIceberg
  config:
    table: "my_db.my_table"
    catalog_properties: *catalog_props

# Note: The table will get created when inserting data (see below)

Insert Data

Once your table is defined, you can write data using standard SQL INSERT or by calling the IcebergIO sink in your SDK of choice.

INSERT INTO my_catalog.my_db.my_table VALUES
    (1, 'Mark', 32),
    (2, 'Omar', 24),
    (3, 'Rachel', 27);
Schema inputSchema =
    Schema.builder().addInt64Field("id").addStringField("name").addInt32Field("age").build();

Pipeline p = Pipeline.create();
p.apply(
        Create.of(
            Row.withSchema(inputSchema).addValues(1, "Mark", 34).build(),
            Row.withSchema(inputSchema).addValues(2, "Omar", 24).build(),
            Row.withSchema(inputSchema).addValues(3, "Rachel", 27).build()))
    .apply(Managed.write("iceberg").withConfig(managedConfig));

p.run();
with beam.Pipeline() as p:
  (
      p
      | beam.Create([
          beam.Row(id=1, name="Mark", age=32),
          beam.Row(id=2, name="Omar", age=24),
          beam.Row(id=3, name="Rachel", age=27)
      ])
      | beam.managed.Write("iceberg", config=managed_config))
pipeline:
  type: chain
  transforms:
    - type: Create
      config:
        elements:
          - id: 1
            name: "Mark"
            age: 32
          - id: 2
            name: "Omar"
            age: 24
          - id: 3
            name: "Rachel"
            age: 27
    - type: WriteToIceberg
      config:
        table: "my_db.my_table"
        catalog_properties: *catalog_props

View Namespaces and Tables

You can use Beam SQL to view the newly created resources:

SHOW DATABASES my_catalog;
SHOW TABLES my_catalog.my_db;

Query Data

SELECT * FROM my_catalog.my_db.my_table;
Pipeline q = Pipeline.create();
PCollection<Row> rows =
    q.apply(Managed.read("iceberg").withConfig(managedConfig)).getSinglePCollection();

rows.apply(
    MapElements.into(TypeDescriptors.voids())
        .via(
            row -> {
              System.out.println(row);
              return null;
            }));

q.run();
with beam.Pipeline() as p:
  (
      p
      | beam.managed.Read("iceberg", config=managed_config)
      | beam.LogElements())
pipeline:
  type: chain
  transforms:
    - type: ReadFromIceberg
      config:
        table: "my_db.my_table"
        catalog_properties: *catalog_props
    - type: LogForTesting

Data Types

Check this overview of Iceberg data types.

IcebergIO leverages Beam Schemas to bridge the gap between SDK-native types and the Iceberg specification. While the Java SDK provides full coverage for the Iceberg v2 spec (with v3 support currently in development), other SDKs may have specific constraints on complex or experimental types. The following examples demonstrate the standard mapping for core data types across SQL, Java, Python, and YAML:

INSERT INTO catalog.namespace.table VALUES (
9223372036854775807, -- BIGINT
2147483647,          -- INTEGER
1.0,                 -- FLOAT
1.0,                 -- DOUBLE
TRUE,                -- BOOLEAN
TIMESTAMP '2018-05-28 20:17:40.123', -- TIMESTAMP
'varchar',           -- VARCHAR
'char',              -- CHAR
ARRAY['abc', 'xyz'],  -- ARRAY
ARRAY[CAST(ROW('abc', 123) AS ROW(nested_str VARCHAR, nested_int INTEGER))] -- ARRAY[STRUCT]
)
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;

public class IcebergBeamSchemaAndRow {
  Schema nestedSchema =
      Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build();
  Schema beamSchema =
      Schema.builder()
          .addBooleanField("boolean_field")
          .addInt32Field("int_field")
          .addInt64Field("long_field")
          .addFloatField("float_field")
          .addDoubleField("double_field")
          .addDecimalField("numeric_field")
          .addByteArrayField("bytes_field")
          .addStringField("string_field")
          .addLogicalTypeField("time_field", SqlTypes.TIME)
          .addLogicalTypeField("date_field", SqlTypes.DATE)
          .addLogicalTypeField("timestamp_field", Timestamp.MICROS)
          .addDateTimeField("timestamptz_field")
          .addArrayField("array_field", Schema.FieldType.INT32)
          .addMapField("map_field", Schema.FieldType.STRING, Schema.FieldType.INT32)
          .addRowField("struct_field", nestedSchema)
          .build();

  Row beamRow =
      Row.withSchema(beamSchema)
          .withFieldValues(
              ImmutableMap.<String, Object>builder()
                  .put("boolean_field", true)
                  .put("int_field", 1)
                  .put("long_field", 2L)
                  .put("float_field", 3.4f)
                  .put("double_field", 4.5d)
                  .put("numeric_field", new BigDecimal(67))
                  .put("bytes_field", new byte[] {1, 2, 3})
                  .put("string_field", "value")
                  .put("time_field", LocalTime.now())
                  .put("date_field", LocalDate.now())
                  .put("timestamp_field", Instant.now())
                  .put("timestamptz_field", DateTime.now())
                  .put("array_field", Arrays.asList(1, 2, 3))
                  .put("map_field", ImmutableMap.of("a", 1, "b", 2))
                  .put(
                      "struct_field",
                      Row.withSchema(nestedSchema).addValues("nested_value", 123).build())
                  .build())
          .build();
}
from decimal import Decimal
import apache_beam as beam
from apache_beam.utils.timestamp import Timestamp

row = beam.Row(
    boolean_field=True,
    int_field=1,
    float_field=2.3,
    numeric_field=Decimal('34'),
    bytes_field=b'value',
    string_field="value",
    timestamptz_field=Timestamp(4, 5),
    array_field=[1, 2, 3],
    map_field={
        "a": 1, "b": 2
    },
    struct_field=beam.Row(nested_field="nested_value", nested_field2=123))
pipeline:
  transforms:
    - type: Create
      config:
        elements:
          - boolean_field: false
            integer_field: 123
            number_field: 4.56
            string_field: "abc"
            struct_field:
              nested_1: a
              nested_2: 1
            array_field: [1, 2, 3]
        output_schema:
          type: object
          properties:
            boolean_field:
              type: boolean
            integer_field:
              type: integer
            number_field:
              type: number
            string_field:
              type: string
            struct_field:
              type: object
              properties:
                nested_1:
                  type: string
                nested_2:
                  type: integer
            array_field:
              type: array
              items:
                type: integer

Further steps

Check out the full IcebergIO configuration to make use of other features like applying a partition spec, table properties, row filtering, column pruning, etc.