Skip to content
This repository has been archived by the owner on May 21, 2020. It is now read-only.

Custom Akka Stream Sources and Flows to interact with Kinesis streams using Kineis Client Library

License

Notifications You must be signed in to change notification settings

aserrallerios/kcl-akka-stream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

50 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

This library is DEPRECATED and will get no further maintenance, please use alpakka instead:

2.0.0 release notes

Akka Stream Source for Kinesis Client Library travis-badge Bintray

For more information about Kinesis please visit the official documentation.

The KCL Source can read from several shards and rebalance automatically when other Workers are started or stopped. It also handles record sequence checkpoints.

For more information about KCL please visit the official documentation.

Installation

<repository>
  <snapshots>
    <enabled>false</enabled>
  </snapshots>
  <id>bintray-<username>-maven</id>
  <name>bintray</name>
  <url>https://dl.bintray.com/content/aserrallerios/maven</url>
</repository>
...
<dependency>
  <groupId>aserrallerios</groupId>
  <artifactId>kcl-akka-stream_2.11</artifactId>
  <version>0.4</version>
  <type>pom</type>
</dependency>
resolvers += "aserrallerios bintray" at "https://dl.bintray.com/content/aserrallerios/maven"
libraryDependencies += "aserrallerios" %% "kcl-akka-stream" % "0.4"

Usage

AWS KCL Worker Source & checkpointer

The KCL Worker Source needs to create and manage Worker instances in order to consume records from Kinesis Streams.

In order to use it, you need to provide a Worker builder and the Source settings:

val workerSourceSettings = KinesisWorkerSourceSettings(
    bufferSize = 1000,
    terminateStreamGracePeriod = 1 minute,
    backpressureTimeout = 1 minute)

val builder: IRecordProcessorFactory => Worker = { recordProcessorFactory =>
  new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(
      new KinesisClientLibConfiguration(
        "myApp",
        "myStreamName",
        DefaultAWSCredentialsProviderChain.getInstance(),
        s"${
          import scala.sys.process._
          "hostname".!!.trim()
        }:${java.util.UUID.randomUUID()}"
      )
    )
    .build()
}

The Source also needs an ExecutionContext to run the Worker's thread and to commit/checkpoint records. Then the Source can be created as usual:

implicit val _ =
  ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1000))

KinesisWorkerSource(builder, workerSourceSettings).to(Sink.ignore)

Committing records

The KCL Worker Source publishes messages downstream that can be committed in order to mark progression of consumers by shard. This process can be done manually or using the provided checkpointer Flow/Sink.

In order to use the Flow/Sink you must provide additional checkpoint settings:

val checkpointSettings = KinesisWorkerCheckpointSettings(100, 30 seconds)

KinesisWorkerSource(builder, workerSourceSettings)
  .via(KinesisWorkerSource.checkpointRecordsFlow(checkpointSettings))
  .to(Sink.ignore)

KinesisWorkerSource(builder, workerSourceSettings).to(
  KinesisWorkerSource.checkpointRecordsSink(checkpointSettings))

Note that checkpointer Flow may not maintain input order of records of different shards.

License

Copyright (c) 2018 Albert Serrallé

This version of kcl-akka-stream is released under the Apache License, Version 2.0 (see LICENSE.txt). By downloading and using this software you agree to the End-User License Agreement (EULA).

We build on a number of third-party software tools, with the following licenses:

Java Libraries

Third-Party software License
amazon-kinesis-client Amazon Software License

About

Custom Akka Stream Sources and Flows to interact with Kinesis streams using Kineis Client Library

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published