diff --git a/spec/integration/cursor_reaping_spec.rb b/spec/integration/cursor_reaping_spec.rb index 1f35264a79..187b09c146 100644 --- a/spec/integration/cursor_reaping_spec.rb +++ b/spec/integration/cursor_reaping_spec.rb @@ -5,19 +5,23 @@ # in MRI, I don't currently know how to force GC to run in JRuby only_mri - let(:client) { subscribed_client } + let(:subscriber) { EventSubscriber.new } + + let(:client) do + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end + end + let(:collection) { client['cursor_reaping_spec'] } - before(:all) do + before do data = [{a: 1}] * 10 - ClientRegistry.instance.global_client('subscribed')['cursor_reaping_spec'].insert_many(data) + authorized_client['cursor_reaping_spec'].delete_many + authorized_client['cursor_reaping_spec'].insert_many(data) end context 'a no-timeout cursor' do - before do - EventSubscriber.clear_events! - end - it 'reaps nothing when we do not query' do # this is a base line test to ensure that the reaps in the other test # aren't done on some global cursor @@ -26,7 +30,7 @@ # just the scope, no query is happening collection.find.batch_size(2).no_cursor_timeout - events = EventSubscriber.started_events.select do |event| + events = subscriber.started_events.select do |event| event.command['killCursors'] end @@ -58,14 +62,14 @@ # force periodic executor to run because its frequency is not configurable client.cluster.instance_variable_get('@periodic_executor').execute - started_event = EventSubscriber.started_events.detect do |event| + started_event = subscriber.started_events.detect do |event| event.command['killCursors'] && event.command['cursors'].map { |c| Utils.int64_value(c) }.include?(cursor_id) end expect(started_event).not_to be_nil - succeeded_event = EventSubscriber.succeeded_events.detect do |event| + succeeded_event = subscriber.succeeded_events.detect do |event| event.command_name == 'killCursors' && event.request_id == started_event.request_id end diff --git a/spec/integration/get_more_spec.rb b/spec/integration/get_more_spec.rb index 33762b2cde..f3cb85157c 100644 --- a/spec/integration/get_more_spec.rb +++ b/spec/integration/get_more_spec.rb @@ -4,8 +4,16 @@ # https://jira.mongodb.org/browse/RUBY-1987 min_server_fcv '3.2' + let(:subscriber) { EventSubscriber.new } + + let(:client) do + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end + end + let(:collection) do - subscribed_client['get_more_spec'] + client['get_more_spec'] end let(:scope) do @@ -16,11 +24,10 @@ collection.delete_many collection.insert_one(a: 1) #collection.insert_one(a: 2) - EventSubscriber.clear_events! end let(:get_more_command) do - event = EventSubscriber.single_command_started_event('getMore') + event = subscriber.single_command_started_event('getMore') event.command['getMore'] end diff --git a/spec/integration/retryable_errors_spec.rb b/spec/integration/retryable_errors_spec.rb index 3931785325..4ea57e9ae4 100644 --- a/spec/integration/retryable_errors_spec.rb +++ b/spec/integration/retryable_errors_spec.rb @@ -4,8 +4,16 @@ # Requirement for fail point min_server_fcv '4.0' + let(:subscriber) { EventSubscriber.new } + + let(:client_options) do + {} + end + let(:client) do - subscribed_client + authorized_client.with(client_options).tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end end let(:collection) do @@ -83,7 +91,7 @@ end let(:events) do - EventSubscriber.command_started_events('find') + subscriber.command_started_events('find') end end @@ -128,7 +136,7 @@ end let(:events) do - EventSubscriber.command_started_events('insert') + subscriber.command_started_events('insert') end end @@ -141,6 +149,7 @@ end it 'publishes two events' do + operation_exception expect(events.length).to eq(2) end @@ -155,6 +164,7 @@ end it 'publishes one event' do + operation_exception expect(events.length).to eq(1) end @@ -217,8 +227,8 @@ context 'modern read retries' do require_wired_tiger_on_36 - let(:client) do - subscribed_client.with(retry_reads: true) + let(:client_options) do + {retry_reads: true} end it_behaves_like 'failing retry' @@ -226,8 +236,8 @@ end context 'legacy read retries' do - let(:client) do - subscribed_client.with(retry_reads: false, read_retry_interval: 0) + let(:client_options) do + {retry_reads: false, read_retry_interval: 0} end it_behaves_like 'failing retry' @@ -236,8 +246,8 @@ end context 'when read retries are disabled' do - let(:client) do - subscribed_client.with(retry_reads: false, max_read_retries: 0) + let(:client_options) do + {retry_reads: false, max_read_retries: 0} end include_context 'read operation' @@ -252,8 +262,8 @@ context 'modern write retries' do require_wired_tiger_on_36 - let(:client) do - subscribed_client.with(retry_writes: true) + let(:client_options) do + {retry_writes: true} end it_behaves_like 'failing retry' @@ -261,8 +271,8 @@ end context 'legacy write' do - let(:client) do - subscribed_client.with(retry_writes: false) + let(:client_options) do + {retry_writes: false} end it_behaves_like 'failing retry' diff --git a/spec/integration/retryable_writes/retryable_writes_36_and_older_spec.rb b/spec/integration/retryable_writes/retryable_writes_36_and_older_spec.rb index f1db358830..4f9747c8ee 100644 --- a/spec/integration/retryable_writes/retryable_writes_36_and_older_spec.rb +++ b/spec/integration/retryable_writes/retryable_writes_36_and_older_spec.rb @@ -32,7 +32,7 @@ let(:check_collection) do # Verify data in the collection using another client instance to avoid # having the verification read trigger cluster scans on the writing client - subscribed_client[TEST_COLL] + root_authorized_client[TEST_COLL] end let(:primary_connection) do diff --git a/spec/integration/step_down_spec.rb b/spec/integration/step_down_spec.rb index 300e9263cb..c8d2e53e14 100644 --- a/spec/integration/step_down_spec.rb +++ b/spec/integration/step_down_spec.rb @@ -29,11 +29,11 @@ end end - let(:event_subscriber) { EventSubscriber.new } + let(:subscriber) { EventSubscriber.new } let(:test_client) do authorized_client_without_any_retries.with(server_selection_timeout: 20).tap do |client| - client.subscribe(Mongo::Monitoring::CONNECTION_POOL, event_subscriber) + client.subscribe(Mongo::Monitoring::CONNECTION_POOL, subscriber) end end @@ -48,8 +48,8 @@ let(:subscribed_client) do test_client.tap do |client| - client.subscribe(Mongo::Monitoring::COMMAND, EventSubscriber) - client.subscribe(Mongo::Monitoring::CONNECTION_POOL, EventSubscriber) + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + client.subscribe(Mongo::Monitoring::CONNECTION_POOL, subscriber) end end @@ -65,13 +65,13 @@ it 'continues through step down' do subscribed_client.cluster.next_primary.pool.clear - event_subscriber.clear_events! + subscriber.clear_events! # get the first item item = enum.next expect(item['test']).to eq(1) - connection_created_events = EventSubscriber.published_events.select do |event| + connection_created_events = subscriber.published_events.select do |event| event.is_a?(Mongo::Monitoring::Event::Cmap::ConnectionCreated) end expect(connection_created_events).not_to be_empty @@ -79,7 +79,7 @@ current_primary = subscribed_client.cluster.next_primary ClusterTools.instance.change_primary - EventSubscriber.clear_events! + subscriber.clear_events! # exhaust the batch 9.times do @@ -90,14 +90,14 @@ item = enum.next expect(item['test']).to eq(1) - get_more_events = EventSubscriber.started_events.select do |event| + get_more_events = subscriber.started_events.select do |event| event.command['getMore'] end expect(get_more_events.length).to eq(1) # getMore should have been sent on the same connection as find - connection_created_events = EventSubscriber.published_events.select do |event| + connection_created_events = subscriber.published_events.select do |event| event.is_a?(Mongo::Monitoring::Event::Cmap::ConnectionCreated) end expect(connection_created_events).to be_empty @@ -155,13 +155,13 @@ let(:fail_point_code) { 10107 } it 'keeps connection open' do - event_subscriber.clear_events! + subscriber.clear_events! expect do collection.insert_one(test: 1) end.to raise_error(Mongo::Error::OperationFailure, /10107/) - expect(event_subscriber.select_published_events(Mongo::Monitoring::Event::Cmap::PoolCleared).count).to eq(0) + expect(subscriber.select_published_events(Mongo::Monitoring::Event::Cmap::PoolCleared).count).to eq(0) end end @@ -174,13 +174,13 @@ let(:fail_point_code) { 10107 } it 'closes the connection' do - event_subscriber.clear_events! + subscriber.clear_events! expect do collection.insert_one(test: 1) end.to raise_error(Mongo::Error::OperationFailure, /10107/) - expect(event_subscriber.select_published_events(Mongo::Monitoring::Event::Cmap::PoolCleared).count).to eq(1) + expect(subscriber.select_published_events(Mongo::Monitoring::Event::Cmap::PoolCleared).count).to eq(1) end end @@ -191,13 +191,13 @@ let(:fail_point_code) { 11600 } it 'closes the connection' do - event_subscriber.clear_events! + subscriber.clear_events! expect do collection.insert_one(test: 1) end.to raise_error(Mongo::Error::OperationFailure, /11600/) - expect(event_subscriber.select_published_events(Mongo::Monitoring::Event::Cmap::PoolCleared).count).to eq(1) + expect(subscriber.select_published_events(Mongo::Monitoring::Event::Cmap::PoolCleared).count).to eq(1) end end end diff --git a/spec/lite_spec_helper.rb b/spec/lite_spec_helper.rb index 5ba9d24587..9738ebf4f2 100644 --- a/spec/lite_spec_helper.rb +++ b/spec/lite_spec_helper.rb @@ -136,8 +136,6 @@ end end -EventSubscriber.initialize - if SpecConfig.instance.active_support? require "active_support/time" require 'mongo/active_support' diff --git a/spec/mongo/client_construction_spec.rb b/spec/mongo/client_construction_spec.rb index f44356d5db..2f78f318f5 100644 --- a/spec/mongo/client_construction_spec.rb +++ b/spec/mongo/client_construction_spec.rb @@ -3,6 +3,8 @@ describe Mongo::Client do clean_slate + let(:subscriber) { EventSubscriber.new } + describe '.new' do context 'with scan: false' do it 'does not perform i/o' do @@ -1253,7 +1255,7 @@ it 'copies monitoring subscribers' do monitoring.subscribers.clear - client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, EventSubscriber.new) + client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, subscriber) expect(monitoring.present_subscribers.length).to eq(1) expect(monitoring.subscribers[Mongo::Monitoring::SERVER_HEARTBEAT].length).to eq(1) @@ -1264,12 +1266,12 @@ it 'does not change subscribers on original client' do monitoring.subscribers.clear - client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, EventSubscriber.new) + client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, subscriber) expect(monitoring.present_subscribers.length).to eq(1) expect(monitoring.subscribers[Mongo::Monitoring::SERVER_HEARTBEAT].length).to eq(1) - new_client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, EventSubscriber.new) - new_client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, EventSubscriber.new) + new_client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, subscriber) + new_client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, subscriber) expect(new_monitoring.present_subscribers.length).to eq(1) expect(new_monitoring.subscribers[Mongo::Monitoring::SERVER_HEARTBEAT].length).to eq(3) # original client should not have gotten any of the new subscribers @@ -1297,7 +1299,7 @@ it 'resets monitoring subscribers' do monitoring.subscribers.clear - client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, EventSubscriber.new) + client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, subscriber) expect(monitoring.present_subscribers.length).to eq(1) expect(monitoring.subscribers[Mongo::Monitoring::SERVER_HEARTBEAT].length).to eq(1) @@ -1310,12 +1312,12 @@ it 'does not change subscribers on original client' do monitoring.subscribers.clear - client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, EventSubscriber.new) + client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, subscriber) expect(monitoring.present_subscribers.length).to eq(1) expect(monitoring.subscribers[Mongo::Monitoring::SERVER_HEARTBEAT].length).to eq(1) - new_client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, EventSubscriber.new) - new_client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, EventSubscriber.new) + new_client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, subscriber) + new_client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, subscriber) # 7 default subscribers + heartbeat expect(new_monitoring.present_subscribers.length).to eq(8) # the heartbeat subscriber on the original client is not inherited @@ -1758,7 +1760,6 @@ # in #with, the consistent behavior is to never transfer sdam_proc to # the new client. context 'when sdam_proc is given on original client' do - let(:subscriber) { EventSubscriber.new } let(:sdam_proc) do Proc.new do |client| diff --git a/spec/mongo/client_spec.rb b/spec/mongo/client_spec.rb index 2debad7aca..a3fd9d02b2 100644 --- a/spec/mongo/client_spec.rb +++ b/spec/mongo/client_spec.rb @@ -600,16 +600,18 @@ root_authorized_client.options.merge(heartbeat_frequency: 100, monitoring: true) end + let(:subscriber) { EventSubscriber.new } + let(:client) do ClientRegistry.instance.new_local_client( SpecConfig.instance.addresses, client_options ).tap do |cl| - cl.subscribe(Mongo::Monitoring::COMMAND, EventSubscriber.clear_events!) + cl.subscribe(Mongo::Monitoring::COMMAND, subscriber) end end let(:command) do - EventSubscriber.started_events.find { |c| c.command_name == 'listDatabases' }.command + subscriber.started_events.find { |c| c.command_name == 'listDatabases' }.command end before do diff --git a/spec/mongo/collection/view/aggregation_spec.rb b/spec/mongo/collection/view/aggregation_spec.rb index 8e34553b90..e749332555 100644 --- a/spec/mongo/collection/view/aggregation_spec.rb +++ b/spec/mongo/collection/view/aggregation_spec.rb @@ -232,8 +232,12 @@ { session: session } end + let(:subscriber) { EventSubscriber.new } + let(:client) do - subscribed_client + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end end let(:session) do @@ -246,7 +250,7 @@ let(:command) do aggregation.explain - EventSubscriber.started_events.find { |c| c.command_name == 'aggregate'}.command + subscriber.started_events.find { |c| c.command_name == 'aggregate'}.command end it 'sends the session id' do diff --git a/spec/mongo/collection/view/map_reduce_spec.rb b/spec/mongo/collection/view/map_reduce_spec.rb index 1984d01433..ce4999d0da 100644 --- a/spec/mongo/collection/view/map_reduce_spec.rb +++ b/spec/mongo/collection/view/map_reduce_spec.rb @@ -239,12 +239,16 @@ Mongo::Collection::View.new(client[TEST_COLL], selector, view_options) end + let(:subscriber) { EventSubscriber.new } + let(:client) do - subscribed_client + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end end let(:find_command) do - EventSubscriber.started_events[-1].command + subscriber.started_events[-1].command end before do diff --git a/spec/mongo/collection/view/readable_spec.rb b/spec/mongo/collection/view/readable_spec.rb index fa669252ae..fbea6caec2 100644 --- a/spec/mongo/collection/view/readable_spec.rb +++ b/spec/mongo/collection/view/readable_spec.rb @@ -206,7 +206,7 @@ let(:command) do operation - EventSubscriber.started_events.find { |cmd| cmd.command_name == 'mapReduce' }.command + subscriber.started_events.find { |cmd| cmd.command_name == 'mapReduce' }.command end it_behaves_like 'an operation supporting causally consistent reads' diff --git a/spec/mongo/collection_spec.rb b/spec/mongo/collection_spec.rb index c3f66afbee..4d4b6a2ab2 100644 --- a/spec/mongo/collection_spec.rb +++ b/spec/mongo/collection_spec.rb @@ -2,8 +2,18 @@ describe Mongo::Collection do + let(:subscriber) { EventSubscriber.new } + + let(:client) do + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end + end + + let(:authorized_collection) { client['collection_spec'] } + before do - authorized_collection.drop + authorized_client['collection_spec'].drop end let(:collection_invalid_write_concern) do @@ -14,10 +24,6 @@ authorized_client[:validating] end - let(:client) do - authorized_client - end - describe '#==' do let(:database) do @@ -246,6 +252,7 @@ authorized_client.with(client_options).tap do |client| expect(client.options[:read]).to eq(Mongo::Options::Redacted.new( mode: :primary_preferred)) + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) end end @@ -278,12 +285,6 @@ {read: { mode: :primary_preferred }} end - let(:subscriber) { EventSubscriber.new } - - before do - client.subscribe(Mongo::Monitoring::COMMAND, subscriber) - end - shared_examples_for "uses collection's read preference when reading" do it "uses collection's read preference when reading" do expect do @@ -1206,10 +1207,6 @@ { session: session } end - let(:client) do - subscribed_client - end - let(:session) do client.start_session end @@ -1220,7 +1217,7 @@ let(:command) do client[TEST_COLL].find({}, session: session).explain - EventSubscriber.started_events.find { |c| c.command_name == 'explain' }.command + subscriber.started_events.find { |c| c.command_name == 'explain' }.command end it 'sends the session id' do @@ -1237,7 +1234,7 @@ let(:command) do operation - EventSubscriber.started_events.find { |cmd| cmd.command_name == 'find' }.command + subscriber.started_events.find { |cmd| cmd.command_name == 'find' }.command end it_behaves_like 'an operation supporting causally consistent reads' @@ -1437,7 +1434,7 @@ context 'when unacknowledged writes is used with an implicit session' do let(:collection_with_unacknowledged_write_concern) do - subscribed_client.with(write: { w: 0 })[TEST_COLL] + client.with(write: { w: 0 })[TEST_COLL] end let(:operation) do @@ -1519,20 +1516,16 @@ def generate context 'when the documents are sent with OP_MSG' do min_server_fcv '3.6' - let(:client) do - subscribed_client - end - let(:documents) do [{ '_id' => 1, 'name' => '1'*16777191 }, { '_id' => 'y' }] end before do - client[TEST_COLL].insert_many(documents) + authorized_collection.insert_many(documents) end let(:insert_events) do - EventSubscriber.started_events.select { |e| e.command_name == 'insert' } + subscriber.started_events.select { |e| e.command_name == 'insert' } end it 'sends the documents in one OP_MSG' do @@ -1680,7 +1673,7 @@ def generate context 'when unacknowledged writes is used with an implicit session' do let(:collection_with_unacknowledged_write_concern) do - subscribed_client.with(write: { w: 0 })[TEST_COLL] + client.with(write: { w: 0 })[TEST_COLL] end let(:operation) do @@ -1890,7 +1883,7 @@ def generate let(:command) do operation - EventSubscriber.started_events.find { |cmd| cmd.command_name == 'aggregate' }.command + subscriber.started_events.find { |cmd| cmd.command_name == 'aggregate' }.command end it_behaves_like 'an operation supporting causally consistent reads' @@ -2095,7 +2088,7 @@ def generate let(:command) do operation - EventSubscriber.started_events.find { |cmd| cmd.command_name == 'count' }.command + subscriber.started_events.find { |cmd| cmd.command_name == 'count' }.command end it_behaves_like 'an operation supporting causally consistent reads' @@ -2213,7 +2206,7 @@ def generate let(:command) do operation - EventSubscriber.started_events.find { |cmd| cmd.command_name == 'distinct' }.command + subscriber.started_events.find { |cmd| cmd.command_name == 'distinct' }.command end it_behaves_like 'an operation supporting causally consistent reads' @@ -2375,7 +2368,7 @@ def generate context 'when unacknowledged writes is used with an implicit session' do let(:collection_with_unacknowledged_write_concern) do - subscribed_client.with(write: { w: 0 })[TEST_COLL] + client.with(write: { w: 0 })[TEST_COLL] end let(:operation) do @@ -2547,7 +2540,7 @@ def generate it_behaves_like 'a failed operation using a session' end - context 'when unacknowledged writes is used with an explicit session' do + context 'when unacknowledged writes are used with an explicit session' do let(:collection_with_unacknowledged_write_concern) do authorized_collection.with(write: { w: 0 }) @@ -2560,10 +2553,10 @@ def generate it_behaves_like 'an explicit session with an unacknowledged write' end - context 'when unacknowledged writes is used with an implicit session' do + context 'when unacknowledged writes are used with an implicit session' do let(:collection_with_unacknowledged_write_concern) do - subscribed_client.with(write: { w: 0 })[TEST_COLL] + client.with(write: { w: 0 })[TEST_COLL] end let(:operation) do @@ -2731,7 +2724,6 @@ def generate end context 'when a session is not provided' do - let(:client) { subscribed_client } let(:collection) { client['test'] } let(:cursors) do @@ -2748,7 +2740,7 @@ def generate let(:command) do operation - event = EventSubscriber.started_events.find { |cmd| cmd.command_name == 'parallelCollectionScan' } + event = subscriber.started_events.find { |cmd| cmd.command_name == 'parallelCollectionScan' } expect(event).not_to be_nil event.command end @@ -2770,7 +2762,7 @@ def generate let(:command) do operation - event = EventSubscriber.started_events.find { |cmd| cmd.command_name == 'parallelCollectionScan' } + event = subscriber.started_events.find { |cmd| cmd.command_name == 'parallelCollectionScan' } expect(event).not_to be_nil event.command end @@ -3173,7 +3165,7 @@ def generate context 'when unacknowledged writes is used with an implicit session' do let(:collection_with_unacknowledged_write_concern) do - subscribed_client.with(write: { w: 0 })[TEST_COLL] + client.with(write: { w: 0 })[TEST_COLL] end let(:operation) do @@ -3595,7 +3587,7 @@ def generate context 'when unacknowledged writes is used with an implicit session' do let(:collection_with_unacknowledged_write_concern) do - subscribed_client.with(write: { w: 0 })[TEST_COLL] + client.with(write: { w: 0 })[TEST_COLL] end let(:operation) do @@ -3953,10 +3945,6 @@ def generate context 'when the documents are sent with OP_MSG' do min_server_fcv '3.6' - let(:client) do - subscribed_client - end - let(:documents) do [{ '_id' => 1, 'name' => '1'*16777191 }, { '_id' => 'y' }] end @@ -3967,7 +3955,7 @@ def generate end let(:update_events) do - EventSubscriber.started_events.select { |e| e.command_name == 'update' } + subscriber.started_events.select { |e| e.command_name == 'update' } end it 'sends the documents in one OP_MSG' do @@ -4017,7 +4005,7 @@ def generate context 'when unacknowledged writes is used with an implicit session' do let(:collection_with_unacknowledged_write_concern) do - subscribed_client.with(write: { w: 0 })[TEST_COLL] + client.with(write: { w: 0 })[TEST_COLL] end let(:operation) do diff --git a/spec/mongo/cursor_spec.rb b/spec/mongo/cursor_spec.rb index c7ae50b896..e2bfcbda86 100644 --- a/spec/mongo/cursor_spec.rb +++ b/spec/mongo/cursor_spec.rb @@ -434,6 +434,14 @@ context 'when an implicit session is used' do min_server_fcv '3.6' + let(:subscriber) { EventSubscriber.new } + + let(:subscribed_client) do + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end + end + let(:collection) do subscribed_client[TEST_COLL] end @@ -456,7 +464,7 @@ end let(:find_events) do - EventSubscriber.started_events.select { |e| e.command_name == "find" } + subscriber.started_events.select { |e| e.command_name == "find" } end context 'when all results are retrieved in the first response' do diff --git a/spec/mongo/database_spec.rb b/spec/mongo/database_spec.rb index d0523435b0..d017c161a2 100644 --- a/spec/mongo/database_spec.rb +++ b/spec/mongo/database_spec.rb @@ -335,8 +335,12 @@ client.start_session end + let(:subscriber) { EventSubscriber.new } + let(:client) do - subscribed_client + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end end it_behaves_like 'an operation using a session' @@ -344,7 +348,7 @@ let(:full_command) do - EventSubscriber.started_events.find { |cmd| cmd.command_name == 'ismaster' }.command + subscriber.started_events.find { |cmd| cmd.command_name == 'ismaster' }.command end it 'does not add a afterClusterTime field' do diff --git a/spec/mongo/index/view_spec.rb b/spec/mongo/index/view_spec.rb index 664031add3..30d21d759b 100644 --- a/spec/mongo/index/view_spec.rb +++ b/spec/mongo/index/view_spec.rb @@ -265,10 +265,14 @@ let(:subscriber) { EventSubscriber.new } - before do - authorized_collection.client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + let(:client) do + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end end + let(:authorized_collection) { client['view-subscribed'] } + context 'when commit_quorum value is supported' do let!(:result) do view.create_many( @@ -278,13 +282,17 @@ ) end + let(:events) do + subscriber.command_started_events('createIndexes') + end + it 'returns ok' do expect(result).to be_successful end it 'passes the commit_quorum option to the server' do - expect(subscriber.started_events.length).to eq(1) - command = subscriber.started_events.first.command + expect(events.length).to eq(1) + command = events.first.command expect(command['commitQuorum']).to eq('majority') end end @@ -772,10 +780,14 @@ let(:subscriber) { EventSubscriber.new } - before do - authorized_collection.client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + let(:client) do + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end end + let(:authorized_collection) { client['view-subscribed'] } + let(:indexes) do authorized_collection.indexes.get('x_1') end @@ -791,9 +803,13 @@ expect(indexes).to_not be_nil end + let(:events) do + subscriber.command_started_events('createIndexes') + end + it 'passes the commit_quorum option to the server' do - expect(subscriber.started_events.length).to eq(1) - command = subscriber.started_events.first.command + expect(events.length).to eq(1) + command = events.first.command expect(command['commitQuorum']).to eq('majority') end end diff --git a/spec/mongo/session/session_pool_spec.rb b/spec/mongo/session/session_pool_spec.rb index 79dc617a40..41256fbddc 100644 --- a/spec/mongo/session/session_pool_spec.rb +++ b/spec/mongo/session/session_pool_spec.rb @@ -157,8 +157,12 @@ pool.checkout end + let(:subscriber) { EventSubscriber.new } + let(:client) do - subscribed_client + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end end context 'when the number of ids is not larger than 10,000' do @@ -175,7 +179,7 @@ let(:end_sessions_command) do pool.end_sessions - EventSubscriber.started_events.find { |c| c.command_name == 'endSessions'} + subscriber.started_events.find { |c| c.command_name == 'endSessions'} end it 'sends the endSessions command with all the session ids' do @@ -219,7 +223,7 @@ end let(:end_sessions_commands) do - EventSubscriber.started_events.select { |c| c.command_name == 'endSessions'} + subscriber.started_events.select { |c| c.command_name == 'endSessions'} end it 'sends the command more than once' do diff --git a/spec/runners/change_streams/test.rb b/spec/runners/change_streams/test.rb index 44d875ea2d..82cfec379f 100644 --- a/spec/runners/change_streams/test.rb +++ b/spec/runners/change_streams/test.rb @@ -86,7 +86,8 @@ def setup_test setup_fail_point(client) - client.subscribe(Mongo::Monitoring::COMMAND, EventSubscriber.clear_events!) + @subscriber = EventSubscriber.new + client.subscribe(Mongo::Monitoring::COMMAND, @subscriber) @target = case @target_type when 'client' @@ -183,7 +184,7 @@ def global_client end def events - EventSubscriber.started_events.reduce([]) do |evs, e| + @subscriber.started_events.reduce([]) do |evs, e| next evs if IGNORE_COMMANDS.include?(e.command_name) command = e.command.dup diff --git a/spec/support/authorization.rb b/spec/support/authorization.rb index 95a9161b48..3ae6be247d 100644 --- a/spec/support/authorization.rb +++ b/spec/support/authorization.rb @@ -65,12 +65,10 @@ def self.included(context) # # @since 2.5.1 context.let(:authorized_client_with_retry_writes) do - EventSubscriber.clear_events! ClientRegistry.instance.global_client('authorized_with_retry_writes') end context.let(:authorized_client_without_retry_writes) do - EventSubscriber.clear_events! ClientRegistry.instance.global_client('authorized_without_retry_writes') end @@ -86,14 +84,6 @@ def self.included(context) ClientRegistry.instance.global_client('authorized_without_any_retries') end - # Provides an authorized mongo client that has a Command subscriber. - # - # @since 2.5.1 - context.let(:subscribed_client) do - EventSubscriber.clear_events! - ClientRegistry.instance.global_client('subscribed') - end - # Provides an unauthorized mongo client on the default test database. # # @since 2.0.0 diff --git a/spec/support/client_registry.rb b/spec/support/client_registry.rb index 85f7f2f4b2..028f4e5620 100644 --- a/spec/support/client_registry.rb +++ b/spec/support/client_registry.rb @@ -116,9 +116,7 @@ def new_global_client(name) global_client('authorized').with( retry_writes: true, server_selection_timeout: 4.97, - ).tap do |client| - client.subscribe(Mongo::Monitoring::COMMAND, EventSubscriber) - end + ) # Provides an authorized mongo client that uses legacy read retry logic. when 'authorized_without_retry_reads' global_client('authorized').with( @@ -137,18 +135,14 @@ def new_global_client(name) global_client('authorized').with( retry_writes: false, server_selection_timeout: 4.99, - ).tap do |client| - client.subscribe(Mongo::Monitoring::COMMAND, EventSubscriber) - end + ) # Provides an authorized mongo client that does not retry writes # using either modern or legacy mechanisms. when 'authorized_without_any_retry_writes' global_client('authorized').with( retry_writes: false, max_write_retries: 0, server_selection_timeout: 4.99, - ).tap do |client| - client.subscribe(Mongo::Monitoring::COMMAND, EventSubscriber) - end + ) # Provides an authorized mongo client that does not retry reads or writes # at all. when 'authorized_without_any_retries' @@ -187,19 +181,6 @@ def new_global_client(name) SpecConfig.instance.addresses, SpecConfig.instance.test_options.merge(client_options), ) - # A client that has an event subscriber for commands. - when 'subscribed' - Mongo::Client.new( - SpecConfig.instance.addresses, - SpecConfig.instance.test_options.merge( - database: SpecConfig.instance.test_db, - ).merge(SpecConfig.instance.credentials_or_external_user( - user: SpecConfig.instance.test_user.name, - password: SpecConfig.instance.test_user.password, - )) - ).tap do |client| - client.subscribe(Mongo::Monitoring::COMMAND, EventSubscriber) - end else raise "Don't know how to construct global client #{name}" end diff --git a/spec/support/event_subscriber.rb b/spec/support/event_subscriber.rb index ea18505064..4f8c7885a3 100644 --- a/spec/support/event_subscriber.rb +++ b/spec/support/event_subscriber.rb @@ -3,122 +3,120 @@ # @since 2.5.0 class EventSubscriber - module Impl - - # The started events. - # - # @since 2.5.0 - attr_reader :started_events - - # The succeeded events. - # - # @since 2.5.0 - attr_reader :succeeded_events - - # The failed events. - # - # @since 2.5.0 - attr_reader :failed_events - - attr_reader :published_events - - # Cache the succeeded event. - # - # @param [ Event ] event The event. - # - # @since 2.5.0 - def succeeded(event) - @mutex.synchronize do - succeeded_events.push(event) - end + # The started events. + # + # @since 2.5.0 + attr_reader :started_events + + # The succeeded events. + # + # @since 2.5.0 + attr_reader :succeeded_events + + # The failed events. + # + # @since 2.5.0 + attr_reader :failed_events + + attr_reader :published_events + + # Cache the succeeded event. + # + # @param [ Event ] event The event. + # + # @since 2.5.0 + def succeeded(event) + @mutex.synchronize do + succeeded_events.push(event) end + end - # Cache the started event. - # - # @param [ Event ] event The event. - # - # @since 2.5.0 - def started(event) - @mutex.synchronize do - started_events.push(event) - end + # Cache the started event. + # + # @param [ Event ] event The event. + # + # @since 2.5.0 + def started(event) + @mutex.synchronize do + started_events.push(event) end + end - # Filters command started events for the specified command name. - def command_started_events(command_name) - started_events.select do |event| - event.command[command_name] - end + # Filters command started events for the specified command name. + def command_started_events(command_name) + started_events.select do |event| + event.command[command_name] end + end - # Locates command stated events for the specified command name, - # asserts that there is exactly one such event, and returns it. - def single_command_started_event(command_name) - events = started_events.select do |event| - event.command[command_name] - end - if events.length != 1 - raise "Expected a single #{command_name} event but we have #{events.length}" + def non_auth_command_started_events + started_events.reject do |event| + %w(authenticate getnonce saslSstart saslContinue).any? do |cmd| + event.command[cmd] end - events.first end + end - def select_started_events(cls) - @started_events.select do |event| - event.is_a?(cls) - end + # Locates command stated events for the specified command name, + # asserts that there is exactly one such event, and returns it. + def single_command_started_event(command_name) + events = started_events.select do |event| + event.command[command_name] end - - def select_succeeded_events(cls) - @succeeded_events.select do |event| - event.is_a?(cls) - end + if events.length != 1 + raise "Expected a single #{command_name} event but we have #{events.length}" end + events.first + end - # Cache the failed event. - # - # @param [ Event ] event The event. - # - # @since 2.5.0 - def failed(event) - @mutex.synchronize do - failed_events.push(event) - end + def select_started_events(cls) + @started_events.select do |event| + event.is_a?(cls) end + end - def select_published_events(cls) - @published_events.select do |event| - event.is_a?(cls) - end + def select_succeeded_events(cls) + @succeeded_events.select do |event| + event.is_a?(cls) end + end - def published(event) - @mutex.synchronize do - @published_events << event - end + # Cache the failed event. + # + # @param [ Event ] event The event. + # + # @since 2.5.0 + def failed(event) + @mutex.synchronize do + failed_events.push(event) end + end - # Clear all cached events. - # - # @since 2.5.1 - def clear_events! - @started_events = [] - @succeeded_events = [] - @failed_events = [] - @published_events = [] - self + def select_published_events(cls) + @published_events.select do |event| + event.is_a?(cls) end + end - def initialize - @mutex = Mutex.new - clear_events! + def published(event) + @mutex.synchronize do + @published_events << event end end - include Impl + # Clear all cached events. + # + # @since 2.5.1 + def clear_events! + @started_events = [] + @succeeded_events = [] + @failed_events = [] + @published_events = [] + self + end - class << self - include Impl - public :initialize + def initialize + @mutex = Mutex.new + clear_events! end end diff --git a/spec/support/shared/session.rb b/spec/support/shared/session.rb index 7d64e96ea5..6d5f653182 100644 --- a/spec/support/shared/session.rb +++ b/spec/support/shared/session.rb @@ -123,10 +123,6 @@ shared_examples 'an explicit session with an unacknowledged write' do - before do - EventSubscriber.clear_events! - end - context 'when sessions are supported' do min_server_fcv '3.6' @@ -135,8 +131,10 @@ end it 'does not add a session id to the operation' do + subscriber.clear_events! operation - expect(EventSubscriber.started_events.collect(&:command).collect { |cmd| cmd['lsid'] }.compact).to be_empty + subscriber.non_auth_command_started_events.length.should == 1 + expect(subscriber.non_auth_command_started_events.collect(&:command).collect { |cmd| cmd['lsid'] }.compact).to be_empty end end @@ -149,24 +147,24 @@ it 'does not add a session id to the operation' do expect(Mongo::Session).not_to receive(:new) + subscriber.clear_events! operation - expect(EventSubscriber.started_events.collect(&:command).collect { |cmd| cmd['lsid'] }.compact).to be_empty + subscriber.non_auth_command_started_events.length.should == 1 + expect(subscriber.non_auth_command_started_events.collect(&:command).collect { |cmd| cmd['lsid'] }.compact).to be_empty end end end shared_examples 'an implicit session with an unacknowledged write' do - before do - EventSubscriber.clear_events! - end - context 'when sessions are supported' do min_server_fcv '3.6' it 'does not add a session id to the operation' do + subscriber.clear_events! operation - expect(EventSubscriber.started_events.collect(&:command).collect { |cmd| cmd['lsid'] }.compact).to be_empty + subscriber.non_auth_command_started_events.length.should == 1 + expect(subscriber.non_auth_command_started_events.collect(&:command).collect { |cmd| cmd['lsid'] }.compact).to be_empty end end @@ -174,16 +172,22 @@ max_server_version '3.4' it 'does not add a session id to the operation' do + subscriber.clear_events! operation - expect(EventSubscriber.started_events.collect(&:command).collect { |cmd| cmd['lsid'] }.compact).to be_empty + subscriber.non_auth_command_started_events.length.should == 1 + expect(subscriber.non_auth_command_started_events.collect(&:command).collect { |cmd| cmd['lsid'] }.compact).to be_empty end end end shared_examples 'an operation supporting causally consistent reads' do + let(:subscriber) { EventSubscriber.new } + let(:client) do - subscribed_client + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end end context 'when connected to a standalone' do @@ -615,8 +619,12 @@ client.start_session end + let(:subscriber) { EventSubscriber.new } + let(:client) do - subscribed_client + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end end shared_examples_for 'does not update the cluster time of the cluster' do @@ -637,7 +645,7 @@ let(:reply_cluster_time) do operation_with_session - EventSubscriber.succeeded_events[-1].reply['$clusterTime'] + subscriber.succeeded_events[-1].reply['$clusterTime'] end it 'updates the cluster time of the cluster', retry: 3 do @@ -660,7 +668,7 @@ let!(:reply_cluster_time) do operation_with_session - EventSubscriber.succeeded_events[-1].reply['$clusterTime'] + subscriber.succeeded_events[-1].reply['$clusterTime'] end it_behaves_like 'does not update the cluster time of the cluster' @@ -681,7 +689,7 @@ let(:reply_cluster_time) do operation - EventSubscriber.succeeded_events[-1].reply['$clusterTime'] + subscriber.succeeded_events[-1].reply['$clusterTime'] end it_behaves_like 'does not update the cluster time of the cluster' @@ -692,7 +700,7 @@ let(:reply_cluster_time) do operation_with_session - EventSubscriber.succeeded_events[-1].reply['$clusterTime'] + subscriber.succeeded_events[-1].reply['$clusterTime'] end context 'when the cluster is sharded or a replica set' do @@ -707,7 +715,7 @@ let(:second_command_cluster_time) do second_operation - EventSubscriber.started_events[-1].command['$clusterTime'] + subscriber.non_auth_command_started_events[-1].command['$clusterTime'] end context 'when the advanced cluster time is greater than the existing cluster time' do @@ -745,7 +753,7 @@ let(:second_command_cluster_time) do second_operation - EventSubscriber.started_events[-1].command['$clusterTime'] + subscriber.non_auth_command_started_events[-1].command['$clusterTime'] end it 'includes the received cluster time in the second command', retry: 3 do @@ -765,7 +773,7 @@ let(:second_command_cluster_time) do second_operation - EventSubscriber.started_events[-1].command['$clusterTime'] + subscriber.non_auth_command_started_events[-1].command['$clusterTime'] end it 'does not update the cluster time of the cluster' do