-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][pulsar-io] Added support for generic record and raw JSON string schemas to CassandraSink #16179
base: master
Are you sure you want to change the base?
Conversation
@tspannhw can you review and upvote? |
The pr had no activity for 30 days, mark with Stale label. |
/pulsarbot run-failure-checks |
/pulsarbot ready-to-test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, thanks
I left some comments, please take a look
pulsar-io/cassandra/pom.xml
Outdated
<dependency> | ||
<groupId>org.apache.pulsar</groupId> | ||
<artifactId>pulsar-functions-local-runner-original</artifactId> | ||
<version>2.11.0-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use project.version
public class CassandraConnector { | ||
|
||
private Cluster cluster; | ||
private Session session; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have to handle concurrent access properly to these fields
|
||
public StringRecordWrapper(String jsonString) { | ||
super(jsonString); | ||
valuesMap = new Gson().fromJson(jsonString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we can cache this "new Gson()" instance in a static field
|
||
package org.apache.pulsar.io.cassandra.util; | ||
|
||
import com.datastax.driver.core.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do not use star imports
pulsar-io/cassandra/src/test/java/org/apache/pulsar/io/cassandra/CassandraSinkExec.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private static final void sendData() throws InterruptedException { | ||
TimeUnit.SECONDS.sleep(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to sleep a fixed amount of time ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason I sleep 10 seconds, is so I can easily visualize the data as it is published to the system. The delay allows me time to execute a query against Cassandra in order to detect the new records.
...ssandra/src/test/java/org/apache/pulsar/io/cassandra/producers/InputTopicProducerThread.java
Outdated
Show resolved
Hide resolved
Codecov Report
@@ Coverage Diff @@
## master #16179 +/- ##
============================================
- Coverage 46.34% 46.29% -0.05%
- Complexity 10394 10420 +26
============================================
Files 703 703
Lines 68838 68858 +20
Branches 7379 7383 +4
============================================
- Hits 31905 31880 -25
- Misses 33324 33375 +51
+ Partials 3609 3603 -6
Flags with carried forward coverage won't be shown. Click here to find out more.
|
/pulsarbot run-failure-checks |
@eolivelli I have made the requested changes, can you PTAL when you get a chance? Thank! |
@eolivelli , Can you please take a look at this when you get the chance? Thanks again! |
@eolivelli Can I please get some feedback on these changes I made in response to your initial feedback? Thanks again for the review, I really appreciated it. |
Signed-off-by: tison <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@david-streamlio Please fix the compilation failure. It seems that you use JUnit while Pulsar use TestNG as the test platform.
I've pushed a merge commit and fixing for license header to your remote branch so be aware to git pull
before working on it.
BTW, please try to expand all star import.
/pulsarbot run-failure-checks |
@tisonkun @eolivelli I would appreciate another review when you have the time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Motivation
The current implementation of the Cassandra Sink connector only supported a single schema type (key, string). This is not useful for production. So I modified the code to be able to support any schema type in Cassandra.
Modifications
Added classes that interrogate the database to determine the schema type at runtime. I also added a framework that will extract the values from the supported incoming schema types (GenericRecord, and String) using the table metadata.
Verifying this change
(Please pick either of the following options)
This change added tests and can be verified as follows:
Added integration tests for testing against a Cassandra database
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)