Skip to content

Commit

Permalink
provide ability to pass arguments to commands (#525)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Jan 29, 2025
1 parent 2b9b57f commit e2114c5
Show file tree
Hide file tree
Showing 16 changed files with 54 additions and 43 deletions.
9 changes: 9 additions & 0 deletions lib/karafka/web/pro/commanding/commands/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ module Commanding
module Commands
# Base for all the commands
class Base
# @return [Hash]
attr_reader :params

# @param params [Hash] command details (if any). Some commands may require extra
# details to work. They can be obtained from here.
def initialize(params)
@params = params
end

private

# @return [String] current process id
Expand Down
10 changes: 5 additions & 5 deletions lib/karafka/web/pro/commanding/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@ module Commanding
# Dispatcher requires Web UI to have a producer
class Dispatcher
# What schema do we have in current Karafka version for commands dispatches
SCHEMA_VERSION = '1.0.0'
SCHEMA_VERSION = '1.1.0'

class << self
# Dispatches the command request
#
# @param name [String, Symbol] name of the command we want to deal with in the process
# @param process_id [String] id of the process. We use name instead of id only
# because in the web ui we work with the full name and it is easier. Since
def command(name, process_id)
# @param params [Hash] hash with extra command params that some commands may use.
def command(name, process_id, params = {})
produce(
process_id,
{
schema_version: SCHEMA_VERSION,
type: 'command',
command: {
name: name
},
# Name is auto-generated and required. Should not be changed
command: params.merge(name: name),
dispatched_at: Time.now.to_f,
process: {
id: process_id
Expand Down
32 changes: 17 additions & 15 deletions lib/karafka/web/pro/commanding/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,28 @@ def call
@listener.each do |message|
next unless @matcher.matches?(message)

control(message.payload[:command][:name])
control(message.payload[:command])
end
end

# Runs the expected command
#
# @param command [String] command expected to run
def control(command)
case command
when 'trace'
Commands::Trace.new.call
when 'stop'
Commands::Stop.new.call
when 'quiet'
Commands::Quiet.new.call
else
# We raise it and will be rescued, reported and ignored. We raise it as this should
# not happen unless there are version conflicts
raise ::Karafka::Errors::UnsupportedCaseError, command
end
# @param params [Hash] command request params hash
def control(params)
command = case params[:name]
when 'trace'
Commands::Trace
when 'stop'
Commands::Stop
when 'quiet'
Commands::Quiet
else
# We raise it and will be rescued, reported and ignored. We raise it as
# this should not happen unless there are version conflicts
raise ::Karafka::Errors::UnsupportedCaseError, command
end

command.new(params).call
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/fixtures/consumers_commands/current.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"schema_version": "1.0.0",
"schema_version": "1.1.0",
"type": "command",
"command": {
"name": "quiet"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"schema_version": "1.0.0",
"schema_version": "1.1.0",
"type": "command",
"command": {
"name": "quiet"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"schema_version": "1.0.0",
"schema_version": "1.1.0",
"type": "command",
"command": {
"name": "stop"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"schema_version": "1.0.0",
"schema_version": "1.1.0",
"type": "command",
"command": {
"name": "stop"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"schema_version": "1.0.0",
"schema_version": "1.1.0",
"type": "command",
"command": {
"name": "trace"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"schema_version": "1.0.0",
"schema_version": "1.1.0",
"type": "result",
"command": {
"name": "trace"
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/karafka/web/pro/commanding/commands/quiet_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# See LICENSE for details.

RSpec.describe_current do
subject(:quiet_command) { described_class.new }
subject(:quiet_command) { described_class.new({}) }

before { allow(Process).to receive(:kill) }

Expand Down
2 changes: 1 addition & 1 deletion spec/lib/karafka/web/pro/commanding/commands/stop_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# See LICENSE for details.

RSpec.describe_current do
subject(:stop_command) { described_class.new }
subject(:stop_command) { described_class.new({}) }

before { allow(Process).to receive(:kill) }

Expand Down
2 changes: 1 addition & 1 deletion spec/lib/karafka/web/pro/commanding/commands/trace_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# See LICENSE for details.

RSpec.describe_current do
subject(:trace_command) { described_class.new }
subject(:trace_command) { described_class.new({}) }

let(:dispatcher) { Karafka::Web::Pro::Commanding::Dispatcher }
let(:test_thread) { Thread.new { sleep(0.5) } }
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/karafka/web/pro/commanding/manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
end

context 'when command is trace' do
let(:trace_command) { Karafka::Web::Pro::Commanding::Commands::Trace.new }
let(:trace_command) { Karafka::Web::Pro::Commanding::Commands::Trace.new({}) }
let(:command_name) { 'trace' }

before do
Expand All @@ -81,7 +81,7 @@
end

context 'when command is quiet' do
let(:quiet_command) { Karafka::Web::Pro::Commanding::Commands::Quiet.new }
let(:quiet_command) { Karafka::Web::Pro::Commanding::Commands::Quiet.new({}) }
let(:command_name) { 'quiet' }

before do
Expand All @@ -96,7 +96,7 @@
end

context 'when command is stop' do
let(:stop_command) { Karafka::Web::Pro::Commanding::Commands::Stop.new }
let(:stop_command) { Karafka::Web::Pro::Commanding::Commands::Stop.new({}) }
let(:command_name) { 'stop' }

before do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
sleep(1)
message = Karafka::Admin.read_topic(commands_topic, 0, 1, -1).first
expect(message.key).to eq(process_id)
expect(message.payload[:schema_version]).to eq('1.0.0')
expect(message.payload[:schema_version]).to eq('1.1.0')
expect(message.payload[:type]).to eq('command')
expect(message.payload[:dispatched_at]).not_to be_nil
expect(message.payload[:command]).to eq(name: 'trace')
Expand All @@ -47,7 +47,7 @@
sleep(1)
message = Karafka::Admin.read_topic(commands_topic, 0, 1, -1).first
expect(message.key).to eq(process_id)
expect(message.payload[:schema_version]).to eq('1.0.0')
expect(message.payload[:schema_version]).to eq('1.1.0')
expect(message.payload[:type]).to eq('command')
expect(message.payload[:dispatched_at]).not_to be_nil
expect(message.payload[:command]).to eq(name: 'quiet')
Expand All @@ -69,7 +69,7 @@
sleep(1)
message = Karafka::Admin.read_topic(commands_topic, 0, 1, -1).first
expect(message.key).to eq(process_id)
expect(message.payload[:schema_version]).to eq('1.0.0')
expect(message.payload[:schema_version]).to eq('1.1.0')
expect(message.payload[:type]).to eq('command')
expect(message.payload[:dispatched_at]).not_to be_nil
expect(message.payload[:command]).to eq(name: 'stop')
Expand All @@ -91,7 +91,7 @@
sleep(1)
message = Karafka::Admin.read_topic(commands_topic, 0, 1, -1).first
expect(message.key).to eq('*')
expect(message.payload[:schema_version]).to eq('1.0.0')
expect(message.payload[:schema_version]).to eq('1.1.0')
expect(message.payload[:type]).to eq('command')
expect(message.payload[:dispatched_at]).not_to be_nil
expect(message.payload[:command]).to eq(name: 'quiet')
Expand All @@ -113,7 +113,7 @@
sleep(1)
message = Karafka::Admin.read_topic(commands_topic, 0, 1, -1).first
expect(message.key).to eq('*')
expect(message.payload[:schema_version]).to eq('1.0.0')
expect(message.payload[:schema_version]).to eq('1.1.0')
expect(message.payload[:type]).to eq('command')
expect(message.payload[:dispatched_at]).not_to be_nil
expect(message.payload[:command]).to eq(name: 'stop')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
stop
quiet
].each do |type|
data = Fixtures.consumers_commands_json("v1.0.0_#{type}", symbolize_names: false)
data = Fixtures.consumers_commands_json("v1.1.0_#{type}", symbolize_names: false)
id = ['*', SecureRandom.uuid].sample
data['process']['id'] = id
produce(commands_topic, data.to_json, key: id)
Expand Down Expand Up @@ -163,7 +163,7 @@
context "when visiting #{command} command" do
before do
topics_config.consumers.commands = commands_topic
produce(commands_topic, Fixtures.consumers_commands_file("v1.0.0_#{command}.json"))
produce(commands_topic, Fixtures.consumers_commands_file("v1.1.0_#{command}.json"))
get 'consumers/commands/0'
end

Expand All @@ -183,7 +183,7 @@
context "when visiting #{command} command that is not with a compatible schema" do
before do
topics_config.consumers.commands = commands_topic
data = Fixtures.consumers_commands_json("v1.0.0_#{command}")
data = Fixtures.consumers_commands_json("v1.1.0_#{command}")
data[:schema_version] = '0.0.1'
produce(commands_topic, data.to_json)
get 'consumers/commands/0'
Expand All @@ -205,7 +205,7 @@
context 'when visiting trace result' do
before do
topics_config.consumers.commands = commands_topic
produce(commands_topic, Fixtures.consumers_commands_file('v1.0.0_trace_result.json'))
produce(commands_topic, Fixtures.consumers_commands_file('v1.1.0_trace_result.json'))
get 'consumers/commands/0'
end

Expand All @@ -225,7 +225,7 @@
context 'when visiting trace result that is not with a compatible schema' do
before do
topics_config.consumers.commands = commands_topic
data = Fixtures.consumers_commands_json('v1.0.0_trace_result')
data = Fixtures.consumers_commands_json('v1.1.0_trace_result')
data[:schema_version] = '0.0.1'
produce(commands_topic, data.to_json)
get 'consumers/commands/0'
Expand Down

0 comments on commit e2114c5

Please sign in to comment.