Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Experimental Iceberg sharding runs #34020

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ message ManagedTransforms {
enum Urns {
ICEBERG_READ = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:iceberg_read:v1"];
// modifying this so that Dataflow service doesn't override the transform
ICEBERG_WRITE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:iceberg_write:v1"];
"beam:schematransform:org.apache.beam:iceberg_write:v2"];
KAFKA_READ = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:kafka_read:v1"];
KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
Expand Down Expand Up @@ -82,19 +87,48 @@ public IcebergWriteResult expand(PCollection<KV<String, Row>> input) {
private PCollection<FileWriteResult> writeTriggered(PCollection<KV<String, Row>> input) {
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
int numShards =
Integer.parseInt(
checkArgumentNotNull(
checkArgumentNotNull(catalogConfig.getCatalogProperties()).get("num_shards")));

// Group records into batches to avoid writing thousands of small files
// Group records into batches to avoid writing thousands of small fileites
PCollection<KV<ShardedKey<String>, Iterable<Row>>> groupedRecords =
input
.apply("WindowIntoGlobal", Window.into(new GlobalWindows()))
// We rely on GroupIntoBatches to group and parallelize records properly,
// respecting our thresholds for number of records and bytes per batch.
// Each output batch will be written to a file.
.apply(
GroupIntoBatches.<String, Row>ofSize(FILE_TRIGGERING_RECORD_COUNT)
"AddShard",
ParDo.of(
new DoFn<KV<String, Row>, KV<ShardedKey<String>, Row>>() {
int shardNumber;

@Setup
public void setup() {
shardNumber = ThreadLocalRandom.current().nextInt(numShards);
}

@ProcessElement
public void processElement(
@Element KV<String, Row> element,
OutputReceiver<KV<ShardedKey<String>, Row>> o) {
String destination = element.getKey();
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.putInt(++shardNumber % numShards);
o.output(
KV.of(ShardedKey.of(destination, buffer.array()), element.getValue()));
}
}))
.setCoder(
KvCoder.of(
org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()),
SchemaCoder.of(dynamicDestinations.getDataSchema())))
.apply(
GroupIntoBatches.<ShardedKey<String>, Row>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withByteSize(FILE_TRIGGERING_BYTE_COUNT)
.withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency))
.withShardedKey())
.withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency)))
.setCoder(
KvCoder.of(
org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()),
Expand Down
3 changes: 2 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -360,4 +360,5 @@ findProject(":sdks:java:extensions:combiners")?.name = "combiners"
include("sdks:java:io:iceberg:hive")
findProject(":sdks:java:io:iceberg:hive")?.name = "hive"
include("sdks:java:io:iceberg:bqms")
findProject(":sdks:java:io:iceberg:bqms")?.name = "bqms"
findProject(":sdks:java:io:iceberg:bqms")?.name = "bqms"
include("temp")
32 changes: 32 additions & 0 deletions temp/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
apply plugin: 'org.apache.beam.module'
apply plugin: 'application'

applyJavaNature(
automaticModuleName: 'org.apache.beam.temp',
exportJavadoc: false,
)

def iceberg_version = "1.6.1"

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ':sdks:java:io:synthetic')
implementation project(path: ':sdks:java:io:iceberg')
implementation project(path: ':sdks:java:io:iceberg:bqms')
implementation "org.apache.iceberg:iceberg-core:$iceberg_version"
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
implementation project(path: ':sdks:java:extensions:google-cloud-platform-core')
implementation project(path: ':runners:google-cloud-dataflow-java')
implementation project(path: ':sdks:java:managed')
implementation project(":sdks:java:io:google-cloud-platform")
implementation project(path: ":runners:direct-java", configuration: "shadow")
implementation library.java.hadoop_common
implementation library.java.vendored_guava_32_1_2_jre
}

tasks.create(name:"execute", type:JavaExec) {
main = project.hasProperty("mainClass") ? project.getProperty("mainClass") : "NONE"
classpath = sourceSets.main.runtimeClasspath
systemProperties System.getProperties()
args project.hasProperty("exec.args") ? project.getProperty("exec.args").split() : []
}
166 changes: 166 additions & 0 deletions temp/src/main/java/org/apache/beam/temp/IcebergShardingRuns.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.temp;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.iceberg.IcebergUtils;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;

public class IcebergShardingRuns {
static String PROJECT = "apache-beam-testing";
static String DATASET = "ahmedabualsaud";
static String WAREHOUSE = "gs://ahmedabualsaud-apache-beam-testing";
static String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
static Schema SCHEMA = Schema.builder().addByteArrayField("bytes").build();

public static void main(String[] args) throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject(PROJECT);
options.setRegion("us-central1");
options.setRunner(DataflowRunner.class);
options.setExperiments(Arrays.asList("use_runner_v2", GcpOptions.STREAMING_ENGINE_EXPERIMENT));

// ======== experiment with these numbers ===========
int numShards = 1;
long payloadSize = 1 << 10; // 1KB
int numIcebergPartitions = 0;
// ==================================================

long numRecords = 5_000_000L;
int initialSplits = 20;
long recordsPerSecond = 1_000;
long mbps = Math.round((double) payloadSize * recordsPerSecond / (1 << 20));
String name =
String.format(
"test-iceberg-bqms-%sshards-%smbps-%spartitions-%s",
numShards, mbps, numIcebergPartitions, System.nanoTime());
options.setJobName(name);
String tableId = String.format("%s.%s", DATASET, name);
if (numIcebergPartitions > 1) {
createTableWithPartitions(tableId, numIcebergPartitions);
}

long rowsPerSecondPerSplit = recordsPerSecond / initialSplits;
double delayMillis = 1000d / rowsPerSecondPerSplit;
SyntheticSourceOptions syntheticSourceOptions =
SyntheticSourceOptions.fromJsonString(
String.format(
"{\"numRecords\":%s, \"valueSizeBytes\":%s, "
+ "\"delayDistribution\":{\"type\":\"const\",\"const\":%s}, "
+ "\"forceNumInitialBundles\":%s}",
numRecords, payloadSize, delayMillis, initialSplits),
SyntheticSourceOptions.class);

Pipeline p = Pipeline.create(options);
PCollection<Row> rows =
p.apply(Read.from(new SyntheticUnboundedSource(syntheticSourceOptions)))
.apply(
"Convert to Rows",
MapElements.into(TypeDescriptors.rows())
.via(kv -> Row.withSchema(SCHEMA).addValue(kv.getValue()).build()))
.setRowSchema(SCHEMA);

int multiplier = (int) Math.ceil(1 / delayMillis);
if (multiplier > 1) {
rows = rows.apply("Multiply by " + multiplier, ParDo.of(Multiplier.of(multiplier)));
}

rows.apply(
Managed.write(Managed.ICEBERG)
.withConfig(
ImmutableMap.of(
"table",
tableId,
"catalog_properties",
catalogProps(numShards),
"triggering_frequency_seconds",
30)));

System.out.println("Dataflow job and table name: " + tableId);

p.run();
}

static class Multiplier extends DoFn<Row, Row> {
int multiplier;

Multiplier(int multiplier) {
this.multiplier = multiplier;
}

static Multiplier of(int multiplier) {
return new Multiplier(multiplier);
}

@ProcessElement
public void process(@Element Row row, OutputReceiver<Row> out) {
for (int i = 0; i < multiplier; i++) {
out.output(row);
}
}
}

static void createTableWithPartitions(String tableId, int numPartitions) {
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(SCHEMA);
PartitionSpec spec =
PartitionSpec.builderFor(icebergSchema).bucket("bytes", numPartitions).build();
catalog.createTable(TableIdentifier.parse(tableId), icebergSchema, spec);
}

static Map<String, String> catalogProps(int numShards) {
return ImmutableMap.of(
"catalog-impl",
BQMS_CATALOG,
"io-impl",
"org.apache.iceberg.gcp.gcs.GCSFileIO",
"gcp_project",
PROJECT,
"gcp_location",
"us-central1",
"warehouse",
WAREHOUSE,
"num_shards",
String.valueOf(numShards));
}

static Catalog catalog =
CatalogUtil.loadCatalog(BQMS_CATALOG, "bqms_catalog", catalogProps(0), new Configuration());
}
Loading