Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: close producer when topic is deleted #4147

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/fluvio-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ producer-file-io = ["fluvio-cli-common/file-records"]
[dependencies]
async-channel = { workspace = true }
async-trait = { workspace = true }
async-std = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't bring in async-std.

anyhow = { workspace = true }
bytesize = { workspace = true, features = ['serde'] }
clap = { workspace = true, features = ["std", "derive", "string", "help", "usage", "env", "error-context"] }
Expand Down
80 changes: 60 additions & 20 deletions crates/fluvio-cli/src/client/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ mod cmd {
#[cfg(feature = "producer-file-io")]
use std::path::PathBuf;

use async_std::io::stdin;
use async_trait::async_trait;
use fluvio_future::io::StreamExt;
use fluvio_sc_schema::message::MsgType;
use fluvio_sc_schema::topic::TopicSpec;
#[cfg(feature = "producer-file-io")]
use futures::future::join_all;
use clap::Parser;
use tokio::select;
use tracing::{error, warn};
use humantime::parse_duration;
use anyhow::Result;

use fluvio::{
Compression, Fluvio, FluvioError, TopicProducerPool, TopicProducerConfigBuilder, RecordKey,
ProduceOutput, DeliverySemantic, SmartModuleContextData, Isolation, SmartModuleInvocation,
Compression, DeliverySemantic, Fluvio, FluvioAdmin, FluvioError, Isolation, ProduceOutput,
RecordKey, SmartModuleContextData, SmartModuleInvocation, TopicProducerConfigBuilder,
TopicProducerPool,
};
use fluvio_extension_common::Terminal;
use fluvio_types::print_cli_ok;
Expand Down Expand Up @@ -243,16 +249,18 @@ mod cmd {
.await?,
);

let admin = fluvio.admin().await;

#[cfg(feature = "producer-file-io")]
if self.raw {
self.process_raw_file(&producer).await?;
} else {
self.produce_lines(producer.clone()).await?;
self.produce_lines(producer.clone(), &admin).await?;
};

#[cfg(not(feature = "producer-file-io"))]
{
self.produce_lines(producer.clone()).await?;
self.produce_lines(producer.clone(), &admin).await?;
}

producer.flush().await?;
Expand Down Expand Up @@ -315,7 +323,11 @@ mod cmd {
}
}

async fn produce_lines(&self, producer: Arc<TopicProducerPool>) -> Result<()> {
async fn produce_lines(
&self,
producer: Arc<TopicProducerPool>,
admin: &FluvioAdmin,
) -> Result<()> {
#[cfg(feature = "producer-file-io")]
if let Some(path) = &self.file {
let reader = BufReader::new(File::open(path)?);
Expand All @@ -340,7 +352,7 @@ mod cmd {
.collect::<Result<Vec<_>, _>>()?;
}
} else {
self.producer_stdin(&producer).await?
self.producer_stdin(&producer, admin).await?
}

#[cfg(not(feature = "producer-file-io"))]
Expand All @@ -349,27 +361,55 @@ mod cmd {
Ok(())
}

async fn producer_stdin(&self, producer: &Arc<TopicProducerPool>) -> Result<()> {
let mut lines = BufReader::new(std::io::stdin()).lines();
async fn producer_stdin(
&self,
producer: &Arc<TopicProducerPool>,
admin: &FluvioAdmin,
) -> Result<()> {
use async_std::io::prelude::*;
use async_std::io::BufReader;
let mut lines = BufReader::new(stdin()).lines();
let mut partition_stream = admin.watch::<TopicSpec>().await?;

if self.interactive_mode() {
eprint!("> ");
}

while let Some(Ok(line)) = lines.next() {
let produce_output = self.produce_line(producer, &line).await?;

if let Some(produce_output) = produce_output {
if self.delivery_semantic != DeliverySemantic::AtMostOnce {
// ensure it was properly sent
produce_output.wait().await?;
loop {
select! {
line = lines.next() => {
if let Some(Ok(line)) = line {
let produce_output = self.produce_line(producer, &line).await?;

if let Some(produce_output) = produce_output {
if self.delivery_semantic != DeliverySemantic::AtMostOnce {
// ensure it was properly sent
produce_output.wait().await?;
}
}

if self.interactive_mode() {
print_cli_ok!();
eprint!("> ");
}
} else {
// When stdin is closed, we break the loop
break;
}
}
stream = partition_stream.next() => {
if let Some(stream) = stream {
let stream = stream?;
for change in stream.inner().changes {
if change.header == MsgType::DELETE && change.content.name == self.topic {
return Err(CliError::TopicDeleted(self.topic.clone()).into());
}
}
}
}
}

if self.interactive_mode() {
print_cli_ok!();
eprint!("> ");
}
}

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,6 @@ pub enum CliError {
SmartModuleConfigBuilder(#[from] fluvio_smartengine::SmartModuleConfigBuilderError),
#[error("Hub error: {0}")]
HubError(String),
#[error("Topic \"{0}\" was deleted")]
TopicDeleted(String),
}
20 changes: 20 additions & 0 deletions tests/cli/fluvio_smoke_tests/produce-error.bats
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,23 @@ teardown_file() {
run bash -c 'echo abcdefgh | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_2" --compression lz4'
assert_failure
}

# Delete topic should stop producer and return error
@test "Delete topic while producing" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi

run bash -c '/usr/bin/expect <<-EOF
spawn "$FLUVIO_BIN" produce "$TOPIC_NAME"
expect "> "
exec "$FLUVIO_BIN" topic delete "$TOPIC_NAME"
expect "Topic \"$TOPIC_NAME\" was deleted"
exit
EOF'
assert_success
assert_output --partial "Topic \"$TOPIC_NAME\" was deleted"
}
4 changes: 2 additions & 2 deletions tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ setup_file() {
}

@test "Home status at remote 1 should show the home cluster connected" {
sleep 15
sleep 30
run timeout 15s "$FLUVIO_BIN" home status

assert_success
Expand Down Expand Up @@ -165,7 +165,7 @@ setup_file() {
}

@test "Home status at remote 2 should show the home cluster connected" {
sleep 15
sleep 30
run timeout 15s "$FLUVIO_BIN" home status

assert_success
Expand Down
Loading