From e2114c50518a70972db77efb9969ed44c9680950 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Wed, 29 Jan 2025 15:22:17 +0100 Subject: [PATCH] provide ability to pass arguments to commands (#525) --- .../web/pro/commanding/commands/base.rb | 9 ++++++ lib/karafka/web/pro/commanding/dispatcher.rb | 10 +++--- lib/karafka/web/pro/commanding/manager.rb | 32 ++++++++++--------- spec/fixtures/consumers_commands/current.json | 2 +- .../{v1.0.0_quiet.json => v1.1.0_quiet.json} | 2 +- ...0_quiet_all.json => v1.1.0_quiet_all.json} | 2 +- .../{v1.0.0_stop.json => v1.1.0_stop.json} | 2 +- ...0.0_stop_all.json => v1.1.0_stop_all.json} | 2 +- .../{v1.0.0_trace.json => v1.1.0_trace.json} | 2 +- ...e_result.json => v1.1.0_trace_result.json} | 2 +- .../web/pro/commanding/commands/quiet_spec.rb | 2 +- .../web/pro/commanding/commands/stop_spec.rb | 2 +- .../web/pro/commanding/commands/trace_spec.rb | 2 +- .../web/pro/commanding/manager_spec.rb | 6 ++-- .../consumers/commanding_controller_spec.rb | 10 +++--- .../consumers/commands_controller_spec.rb | 10 +++--- 16 files changed, 54 insertions(+), 43 deletions(-) rename spec/fixtures/consumers_commands/{v1.0.0_quiet.json => v1.1.0_quiet.json} (84%) rename spec/fixtures/consumers_commands/{v1.0.0_quiet_all.json => v1.1.0_quiet_all.json} (82%) rename spec/fixtures/consumers_commands/{v1.0.0_stop.json => v1.1.0_stop.json} (84%) rename spec/fixtures/consumers_commands/{v1.0.0_stop_all.json => v1.1.0_stop_all.json} (82%) rename spec/fixtures/consumers_commands/{v1.0.0_trace.json => v1.1.0_trace.json} (84%) rename spec/fixtures/consumers_commands/{v1.0.0_trace_result.json => v1.1.0_trace_result.json} (99%) diff --git a/lib/karafka/web/pro/commanding/commands/base.rb b/lib/karafka/web/pro/commanding/commands/base.rb index 86c852ba..8dcf8fca 100644 --- a/lib/karafka/web/pro/commanding/commands/base.rb +++ b/lib/karafka/web/pro/commanding/commands/base.rb @@ -11,6 +11,15 @@ module Commanding module Commands # Base for all the commands class Base + # @return [Hash] + attr_reader :params + + # @param params [Hash] command details (if any). Some commands may require extra + # details to work. They can be obtained from here. + def initialize(params) + @params = params + end + private # @return [String] current process id diff --git a/lib/karafka/web/pro/commanding/dispatcher.rb b/lib/karafka/web/pro/commanding/dispatcher.rb index 08668655..46e22a63 100644 --- a/lib/karafka/web/pro/commanding/dispatcher.rb +++ b/lib/karafka/web/pro/commanding/dispatcher.rb @@ -14,7 +14,7 @@ module Commanding # Dispatcher requires Web UI to have a producer class Dispatcher # What schema do we have in current Karafka version for commands dispatches - SCHEMA_VERSION = '1.0.0' + SCHEMA_VERSION = '1.1.0' class << self # Dispatches the command request @@ -22,15 +22,15 @@ class << self # @param name [String, Symbol] name of the command we want to deal with in the process # @param process_id [String] id of the process. We use name instead of id only # because in the web ui we work with the full name and it is easier. Since - def command(name, process_id) + # @param params [Hash] hash with extra command params that some commands may use. + def command(name, process_id, params = {}) produce( process_id, { schema_version: SCHEMA_VERSION, type: 'command', - command: { - name: name - }, + # Name is auto-generated and required. Should not be changed + command: params.merge(name: name), dispatched_at: Time.now.to_f, process: { id: process_id diff --git a/lib/karafka/web/pro/commanding/manager.rb b/lib/karafka/web/pro/commanding/manager.rb index f02c3219..ac649b19 100644 --- a/lib/karafka/web/pro/commanding/manager.rb +++ b/lib/karafka/web/pro/commanding/manager.rb @@ -62,26 +62,28 @@ def call @listener.each do |message| next unless @matcher.matches?(message) - control(message.payload[:command][:name]) + control(message.payload[:command]) end end # Runs the expected command # - # @param command [String] command expected to run - def control(command) - case command - when 'trace' - Commands::Trace.new.call - when 'stop' - Commands::Stop.new.call - when 'quiet' - Commands::Quiet.new.call - else - # We raise it and will be rescued, reported and ignored. We raise it as this should - # not happen unless there are version conflicts - raise ::Karafka::Errors::UnsupportedCaseError, command - end + # @param params [Hash] command request params hash + def control(params) + command = case params[:name] + when 'trace' + Commands::Trace + when 'stop' + Commands::Stop + when 'quiet' + Commands::Quiet + else + # We raise it and will be rescued, reported and ignored. We raise it as + # this should not happen unless there are version conflicts + raise ::Karafka::Errors::UnsupportedCaseError, command + end + + command.new(params).call end end end diff --git a/spec/fixtures/consumers_commands/current.json b/spec/fixtures/consumers_commands/current.json index 00c70c73..c8a0f422 120000 --- a/spec/fixtures/consumers_commands/current.json +++ b/spec/fixtures/consumers_commands/current.json @@ -1 +1 @@ -v1.0.0_quiet.json \ No newline at end of file +v1.1.0_quiet.json \ No newline at end of file diff --git a/spec/fixtures/consumers_commands/v1.0.0_quiet.json b/spec/fixtures/consumers_commands/v1.1.0_quiet.json similarity index 84% rename from spec/fixtures/consumers_commands/v1.0.0_quiet.json rename to spec/fixtures/consumers_commands/v1.1.0_quiet.json index d2da63ae..7b47340b 100644 --- a/spec/fixtures/consumers_commands/v1.0.0_quiet.json +++ b/spec/fixtures/consumers_commands/v1.1.0_quiet.json @@ -1,5 +1,5 @@ { - "schema_version": "1.0.0", + "schema_version": "1.1.0", "type": "command", "command": { "name": "quiet" diff --git a/spec/fixtures/consumers_commands/v1.0.0_quiet_all.json b/spec/fixtures/consumers_commands/v1.1.0_quiet_all.json similarity index 82% rename from spec/fixtures/consumers_commands/v1.0.0_quiet_all.json rename to spec/fixtures/consumers_commands/v1.1.0_quiet_all.json index 3dfd9792..309ce1db 100644 --- a/spec/fixtures/consumers_commands/v1.0.0_quiet_all.json +++ b/spec/fixtures/consumers_commands/v1.1.0_quiet_all.json @@ -1,5 +1,5 @@ { - "schema_version": "1.0.0", + "schema_version": "1.1.0", "type": "command", "command": { "name": "quiet" diff --git a/spec/fixtures/consumers_commands/v1.0.0_stop.json b/spec/fixtures/consumers_commands/v1.1.0_stop.json similarity index 84% rename from spec/fixtures/consumers_commands/v1.0.0_stop.json rename to spec/fixtures/consumers_commands/v1.1.0_stop.json index adbb7c60..5c588f37 100644 --- a/spec/fixtures/consumers_commands/v1.0.0_stop.json +++ b/spec/fixtures/consumers_commands/v1.1.0_stop.json @@ -1,5 +1,5 @@ { - "schema_version": "1.0.0", + "schema_version": "1.1.0", "type": "command", "command": { "name": "stop" diff --git a/spec/fixtures/consumers_commands/v1.0.0_stop_all.json b/spec/fixtures/consumers_commands/v1.1.0_stop_all.json similarity index 82% rename from spec/fixtures/consumers_commands/v1.0.0_stop_all.json rename to spec/fixtures/consumers_commands/v1.1.0_stop_all.json index 6c01b106..f8f30c62 100644 --- a/spec/fixtures/consumers_commands/v1.0.0_stop_all.json +++ b/spec/fixtures/consumers_commands/v1.1.0_stop_all.json @@ -1,5 +1,5 @@ { - "schema_version": "1.0.0", + "schema_version": "1.1.0", "type": "command", "command": { "name": "stop" diff --git a/spec/fixtures/consumers_commands/v1.0.0_trace.json b/spec/fixtures/consumers_commands/v1.1.0_trace.json similarity index 84% rename from spec/fixtures/consumers_commands/v1.0.0_trace.json rename to spec/fixtures/consumers_commands/v1.1.0_trace.json index d492748d..85a7fbda 100644 --- a/spec/fixtures/consumers_commands/v1.0.0_trace.json +++ b/spec/fixtures/consumers_commands/v1.1.0_trace.json @@ -1,5 +1,5 @@ { - "schema_version": "1.0.0", + "schema_version": "1.1.0", "type": "command", "command": { "name": "trace" diff --git a/spec/fixtures/consumers_commands/v1.0.0_trace_result.json b/spec/fixtures/consumers_commands/v1.1.0_trace_result.json similarity index 99% rename from spec/fixtures/consumers_commands/v1.0.0_trace_result.json rename to spec/fixtures/consumers_commands/v1.1.0_trace_result.json index d1e44592..cb0fb105 100644 --- a/spec/fixtures/consumers_commands/v1.0.0_trace_result.json +++ b/spec/fixtures/consumers_commands/v1.1.0_trace_result.json @@ -1,5 +1,5 @@ { - "schema_version": "1.0.0", + "schema_version": "1.1.0", "type": "result", "command": { "name": "trace" diff --git a/spec/lib/karafka/web/pro/commanding/commands/quiet_spec.rb b/spec/lib/karafka/web/pro/commanding/commands/quiet_spec.rb index f648e830..ea6f59c4 100644 --- a/spec/lib/karafka/web/pro/commanding/commands/quiet_spec.rb +++ b/spec/lib/karafka/web/pro/commanding/commands/quiet_spec.rb @@ -4,7 +4,7 @@ # See LICENSE for details. RSpec.describe_current do - subject(:quiet_command) { described_class.new } + subject(:quiet_command) { described_class.new({}) } before { allow(Process).to receive(:kill) } diff --git a/spec/lib/karafka/web/pro/commanding/commands/stop_spec.rb b/spec/lib/karafka/web/pro/commanding/commands/stop_spec.rb index b1da5421..338fdf51 100644 --- a/spec/lib/karafka/web/pro/commanding/commands/stop_spec.rb +++ b/spec/lib/karafka/web/pro/commanding/commands/stop_spec.rb @@ -4,7 +4,7 @@ # See LICENSE for details. RSpec.describe_current do - subject(:stop_command) { described_class.new } + subject(:stop_command) { described_class.new({}) } before { allow(Process).to receive(:kill) } diff --git a/spec/lib/karafka/web/pro/commanding/commands/trace_spec.rb b/spec/lib/karafka/web/pro/commanding/commands/trace_spec.rb index e46256b8..f5ad4b03 100644 --- a/spec/lib/karafka/web/pro/commanding/commands/trace_spec.rb +++ b/spec/lib/karafka/web/pro/commanding/commands/trace_spec.rb @@ -4,7 +4,7 @@ # See LICENSE for details. RSpec.describe_current do - subject(:trace_command) { described_class.new } + subject(:trace_command) { described_class.new({}) } let(:dispatcher) { Karafka::Web::Pro::Commanding::Dispatcher } let(:test_thread) { Thread.new { sleep(0.5) } } diff --git a/spec/lib/karafka/web/pro/commanding/manager_spec.rb b/spec/lib/karafka/web/pro/commanding/manager_spec.rb index 22eccc96..388530d6 100644 --- a/spec/lib/karafka/web/pro/commanding/manager_spec.rb +++ b/spec/lib/karafka/web/pro/commanding/manager_spec.rb @@ -66,7 +66,7 @@ end context 'when command is trace' do - let(:trace_command) { Karafka::Web::Pro::Commanding::Commands::Trace.new } + let(:trace_command) { Karafka::Web::Pro::Commanding::Commands::Trace.new({}) } let(:command_name) { 'trace' } before do @@ -81,7 +81,7 @@ end context 'when command is quiet' do - let(:quiet_command) { Karafka::Web::Pro::Commanding::Commands::Quiet.new } + let(:quiet_command) { Karafka::Web::Pro::Commanding::Commands::Quiet.new({}) } let(:command_name) { 'quiet' } before do @@ -96,7 +96,7 @@ end context 'when command is stop' do - let(:stop_command) { Karafka::Web::Pro::Commanding::Commands::Stop.new } + let(:stop_command) { Karafka::Web::Pro::Commanding::Commands::Stop.new({}) } let(:command_name) { 'stop' } before do diff --git a/spec/lib/karafka/web/pro/ui/controllers/consumers/commanding_controller_spec.rb b/spec/lib/karafka/web/pro/ui/controllers/consumers/commanding_controller_spec.rb index 79cc7bef..184c54ec 100644 --- a/spec/lib/karafka/web/pro/ui/controllers/consumers/commanding_controller_spec.rb +++ b/spec/lib/karafka/web/pro/ui/controllers/consumers/commanding_controller_spec.rb @@ -25,7 +25,7 @@ sleep(1) message = Karafka::Admin.read_topic(commands_topic, 0, 1, -1).first expect(message.key).to eq(process_id) - expect(message.payload[:schema_version]).to eq('1.0.0') + expect(message.payload[:schema_version]).to eq('1.1.0') expect(message.payload[:type]).to eq('command') expect(message.payload[:dispatched_at]).not_to be_nil expect(message.payload[:command]).to eq(name: 'trace') @@ -47,7 +47,7 @@ sleep(1) message = Karafka::Admin.read_topic(commands_topic, 0, 1, -1).first expect(message.key).to eq(process_id) - expect(message.payload[:schema_version]).to eq('1.0.0') + expect(message.payload[:schema_version]).to eq('1.1.0') expect(message.payload[:type]).to eq('command') expect(message.payload[:dispatched_at]).not_to be_nil expect(message.payload[:command]).to eq(name: 'quiet') @@ -69,7 +69,7 @@ sleep(1) message = Karafka::Admin.read_topic(commands_topic, 0, 1, -1).first expect(message.key).to eq(process_id) - expect(message.payload[:schema_version]).to eq('1.0.0') + expect(message.payload[:schema_version]).to eq('1.1.0') expect(message.payload[:type]).to eq('command') expect(message.payload[:dispatched_at]).not_to be_nil expect(message.payload[:command]).to eq(name: 'stop') @@ -91,7 +91,7 @@ sleep(1) message = Karafka::Admin.read_topic(commands_topic, 0, 1, -1).first expect(message.key).to eq('*') - expect(message.payload[:schema_version]).to eq('1.0.0') + expect(message.payload[:schema_version]).to eq('1.1.0') expect(message.payload[:type]).to eq('command') expect(message.payload[:dispatched_at]).not_to be_nil expect(message.payload[:command]).to eq(name: 'quiet') @@ -113,7 +113,7 @@ sleep(1) message = Karafka::Admin.read_topic(commands_topic, 0, 1, -1).first expect(message.key).to eq('*') - expect(message.payload[:schema_version]).to eq('1.0.0') + expect(message.payload[:schema_version]).to eq('1.1.0') expect(message.payload[:type]).to eq('command') expect(message.payload[:dispatched_at]).not_to be_nil expect(message.payload[:command]).to eq(name: 'stop') diff --git a/spec/lib/karafka/web/pro/ui/controllers/consumers/commands_controller_spec.rb b/spec/lib/karafka/web/pro/ui/controllers/consumers/commands_controller_spec.rb index 61ac4d72..4e5c14bb 100644 --- a/spec/lib/karafka/web/pro/ui/controllers/consumers/commands_controller_spec.rb +++ b/spec/lib/karafka/web/pro/ui/controllers/consumers/commands_controller_spec.rb @@ -88,7 +88,7 @@ stop quiet ].each do |type| - data = Fixtures.consumers_commands_json("v1.0.0_#{type}", symbolize_names: false) + data = Fixtures.consumers_commands_json("v1.1.0_#{type}", symbolize_names: false) id = ['*', SecureRandom.uuid].sample data['process']['id'] = id produce(commands_topic, data.to_json, key: id) @@ -163,7 +163,7 @@ context "when visiting #{command} command" do before do topics_config.consumers.commands = commands_topic - produce(commands_topic, Fixtures.consumers_commands_file("v1.0.0_#{command}.json")) + produce(commands_topic, Fixtures.consumers_commands_file("v1.1.0_#{command}.json")) get 'consumers/commands/0' end @@ -183,7 +183,7 @@ context "when visiting #{command} command that is not with a compatible schema" do before do topics_config.consumers.commands = commands_topic - data = Fixtures.consumers_commands_json("v1.0.0_#{command}") + data = Fixtures.consumers_commands_json("v1.1.0_#{command}") data[:schema_version] = '0.0.1' produce(commands_topic, data.to_json) get 'consumers/commands/0' @@ -205,7 +205,7 @@ context 'when visiting trace result' do before do topics_config.consumers.commands = commands_topic - produce(commands_topic, Fixtures.consumers_commands_file('v1.0.0_trace_result.json')) + produce(commands_topic, Fixtures.consumers_commands_file('v1.1.0_trace_result.json')) get 'consumers/commands/0' end @@ -225,7 +225,7 @@ context 'when visiting trace result that is not with a compatible schema' do before do topics_config.consumers.commands = commands_topic - data = Fixtures.consumers_commands_json('v1.0.0_trace_result') + data = Fixtures.consumers_commands_json('v1.1.0_trace_result') data[:schema_version] = '0.0.1' produce(commands_topic, data.to_json) get 'consumers/commands/0'