This library is DEPRECATED and will get no further maintenance, please use alpakka
instead:
For more information about Kinesis please visit the official documentation.
The KCL Source can read from several shards and rebalance automatically when other Workers are started or stopped. It also handles record sequence checkpoints.
For more information about KCL please visit the official documentation.
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>bintray-<username>-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/content/aserrallerios/maven</url>
</repository>
...
<dependency>
<groupId>aserrallerios</groupId>
<artifactId>kcl-akka-stream_2.11</artifactId>
<version>0.4</version>
<type>pom</type>
</dependency>
resolvers += "aserrallerios bintray" at "https://dl.bintray.com/content/aserrallerios/maven"
libraryDependencies += "aserrallerios" %% "kcl-akka-stream" % "0.4"
The KCL Worker Source needs to create and manage Worker instances in order to consume records from Kinesis Streams.
In order to use it, you need to provide a Worker builder and the Source settings:
val workerSourceSettings = KinesisWorkerSourceSettings(
bufferSize = 1000,
terminateStreamGracePeriod = 1 minute,
backpressureTimeout = 1 minute)
val builder: IRecordProcessorFactory => Worker = { recordProcessorFactory =>
new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(
new KinesisClientLibConfiguration(
"myApp",
"myStreamName",
DefaultAWSCredentialsProviderChain.getInstance(),
s"${
import scala.sys.process._
"hostname".!!.trim()
}:${java.util.UUID.randomUUID()}"
)
)
.build()
}
The Source also needs an ExecutionContext
to run the Worker's thread and to commit/checkpoint records. Then the Source can be created as usual:
implicit val _ =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1000))
KinesisWorkerSource(builder, workerSourceSettings).to(Sink.ignore)
The KCL Worker Source publishes messages downstream that can be committed in order to mark progression of consumers by shard. This process can be done manually or using the provided checkpointer Flow/Sink.
In order to use the Flow/Sink you must provide additional checkpoint settings:
val checkpointSettings = KinesisWorkerCheckpointSettings(100, 30 seconds)
KinesisWorkerSource(builder, workerSourceSettings)
.via(KinesisWorkerSource.checkpointRecordsFlow(checkpointSettings))
.to(Sink.ignore)
KinesisWorkerSource(builder, workerSourceSettings).to(
KinesisWorkerSource.checkpointRecordsSink(checkpointSettings))
Note that checkpointer Flow may not maintain input order of records of different shards.
Copyright (c) 2018 Albert Serrallé
This version of kcl-akka-stream is released under the Apache License, Version 2.0 (see LICENSE.txt). By downloading and using this software you agree to the End-User License Agreement (EULA).
We build on a number of third-party software tools, with the following licenses:
Third-Party software | License |
---|---|
amazon-kinesis-client | Amazon Software License |