From 7e18887a325508a9ef6805d33481f4f91239b799 Mon Sep 17 00:00:00 2001 From: Dominik Maier Date: Mon, 20 Jan 2025 20:28:19 +0100 Subject: [PATCH] Remove shmem associated type (#2870) * reduce shm trait bound * Rename to SendExiting * alpha beta gamam * alphabet * work * std only --------- Co-authored-by: Romain Malmain --- fuzzers/binary_only/qemu_cmin/src/fuzzer.rs | 2 +- .../binary_only/qemu_coverage/src/fuzzer.rs | 2 +- libafl/src/events/centralized.rs | 36 +- libafl/src/events/llmp/mgr.rs | 47 +- libafl/src/events/llmp/restarting.rs | 22 +- libafl/src/events/mod.rs | 28 +- libafl/src/events/simple.rs | 14 +- libafl/src/events/tcp.rs | 13 +- libafl_bolts/src/llmp.rs | 820 +++++++++--------- libafl_libfuzzer/runtime/src/merge.rs | 2 +- 10 files changed, 513 insertions(+), 473 deletions(-) diff --git a/fuzzers/binary_only/qemu_cmin/src/fuzzer.rs b/fuzzers/binary_only/qemu_cmin/src/fuzzer.rs index 1644c09d55..72ad553881 100644 --- a/fuzzers/binary_only/qemu_cmin/src/fuzzer.rs +++ b/fuzzers/binary_only/qemu_cmin/src/fuzzer.rs @@ -7,7 +7,7 @@ use std::{env, fmt::Write, io, path::PathBuf, process, ptr::NonNull}; use clap::{builder::Str, Parser}; use libafl::{ corpus::{Corpus, InMemoryOnDiskCorpus, NopCorpus}, - events::{EventRestarter, ManagerExit, SimpleRestartingEventManager}, + events::{EventRestarter, SendExiting, SimpleRestartingEventManager}, executors::ExitKind, feedbacks::MaxMapFeedback, fuzzer::StdFuzzer, diff --git a/fuzzers/binary_only/qemu_coverage/src/fuzzer.rs b/fuzzers/binary_only/qemu_coverage/src/fuzzer.rs index 5bb670362e..df2f174561 100644 --- a/fuzzers/binary_only/qemu_coverage/src/fuzzer.rs +++ b/fuzzers/binary_only/qemu_coverage/src/fuzzer.rs @@ -9,7 +9,7 @@ use clap::{builder::Str, Parser}; use libafl::{ corpus::{Corpus, InMemoryCorpus}, events::{ - launcher::Launcher, ClientDescription, EventConfig, LlmpRestartingEventManager, ManagerExit, + launcher::Launcher, ClientDescription, EventConfig, LlmpRestartingEventManager, SendExiting, }, executors::ExitKind, fuzzer::StdFuzzer, diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index 63e80359d7..0515f1c6d7 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -18,27 +18,28 @@ use libafl_bolts::{ }; use libafl_bolts::{ llmp::{LlmpClient, LlmpClientDescription, Tag}, - shmem::{NopShMem, NopShMemProvider, ShMem, ShMemProvider}, + shmem::{ShMem, ShMemProvider}, tuples::{Handle, MatchNameRef}, ClientId, }; use serde::{de::DeserializeOwned, Serialize}; -use super::{CanSerializeObserver, ManagerExit, NopEventManager}; +use super::AwaitRestartSafe; #[cfg(feature = "llmp_compression")] use crate::events::llmp::COMPRESS_THRESHOLD; use crate::{ common::HasMetadata, events::{ serialize_observers_adaptive, std_maybe_report_progress, std_report_progress, - AdaptiveSerializer, Event, EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, - EventProcessor, EventRestarter, HasEventManagerId, LogSeverity, ProgressReporter, + AdaptiveSerializer, CanSerializeObserver, Event, EventConfig, EventFirer, + EventManagerHooksTuple, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, + LogSeverity, ProgressReporter, SendExiting, }, executors::HasObservers, fuzzer::{EvaluatorObservers, ExecutionProcessor}, - inputs::{Input, NopInput}, + inputs::Input, observers::TimeObserver, - state::{HasExecutions, HasLastReportTime, MaybeHasClientPerfMonitor, NopState, Stoppable}, + state::{HasExecutions, HasLastReportTime, MaybeHasClientPerfMonitor, Stoppable}, Error, }; @@ -58,16 +59,7 @@ pub struct CentralizedEventManager { phantom: PhantomData<(I, S)>, } -impl - CentralizedEventManager< - NopEventManager, - (), - NopInput, - NopState, - NopShMem, - NopShMemProvider, - > -{ +impl CentralizedEventManager<(), (), (), (), (), ()> { /// Creates a builder for [`CentralizedEventManager`] #[must_use] pub fn builder() -> CentralizedEventManagerBuilder { @@ -291,7 +283,7 @@ impl CanSerializeObserver for CentralizedEventManager where EM: AdaptiveSerializer, - OT: Serialize + MatchNameRef, + OT: MatchNameRef + Serialize, { fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> { serialize_observers_adaptive::( @@ -303,9 +295,9 @@ where } } -impl ManagerExit for CentralizedEventManager +impl SendExiting for CentralizedEventManager where - EM: ManagerExit, + EM: SendExiting, SHM: ShMem, SP: ShMemProvider, { @@ -313,7 +305,13 @@ where self.client.sender_mut().send_exiting()?; self.inner.send_exiting() } +} +impl AwaitRestartSafe for CentralizedEventManager +where + SHM: ShMem, + EM: AwaitRestartSafe, +{ #[inline] fn await_restart_safe(&mut self) { self.client.await_safe_to_unmap_blocking(); diff --git a/libafl/src/events/llmp/mgr.rs b/libafl/src/events/llmp/mgr.rs index de348130f9..be794116be 100644 --- a/libafl/src/events/llmp/mgr.rs +++ b/libafl/src/events/llmp/mgr.rs @@ -18,7 +18,7 @@ use libafl_bolts::{ use libafl_bolts::{ current_time, llmp::{LlmpClient, LlmpClientDescription, LLMP_FLAG_FROM_MM}, - shmem::{NopShMem, NopShMemProvider, ShMem, ShMemProvider}, + shmem::{NopShMem, ShMem, ShMemProvider}, tuples::Handle, ClientId, }; @@ -38,18 +38,18 @@ use crate::events::{serialize_observers_adaptive, CanSerializeObserver}; use crate::{ events::{ llmp::{LLMP_TAG_EVENT_TO_BOTH, _LLMP_TAG_EVENT_TO_BROKER}, - std_maybe_report_progress, std_on_restart, std_report_progress, AdaptiveSerializer, Event, - EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, EventProcessor, - EventRestarter, HasEventManagerId, ManagerExit, ProgressReporter, + std_maybe_report_progress, std_on_restart, std_report_progress, AdaptiveSerializer, + AwaitRestartSafe, Event, EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, + EventProcessor, EventRestarter, HasEventManagerId, ProgressReporter, SendExiting, }, executors::HasObservers, fuzzer::{EvaluatorObservers, ExecutionProcessor}, - inputs::{Input, NopInput}, + inputs::Input, observers::TimeObserver, stages::HasCurrentStageId, state::{ HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions, - MaybeHasClientPerfMonitor, NopState, Stoppable, + MaybeHasClientPerfMonitor, Stoppable, }, Error, HasMetadata, }; @@ -62,7 +62,6 @@ const INITIAL_EVENT_BUFFER_SIZE: usize = 1024 * 4; pub struct LlmpEventManager where SHM: ShMem, - SP: ShMemProvider, { /// We only send 1 testcase for every `throttle` second pub(crate) throttle: Option, @@ -82,11 +81,11 @@ where serializations_cnt: usize, should_serialize_cnt: usize, pub(crate) time_ref: Option>, - phantom: PhantomData<(I, S)>, event_buffer: Vec, + phantom: PhantomData<(I, S)>, } -impl LlmpEventManager<(), NopState, NopInput, NopShMem, NopShMemProvider> { +impl LlmpEventManager<(), (), (), NopShMem, ()> { /// Creates a builder for [`LlmpEventManager`] #[must_use] pub fn builder() -> LlmpEventManagerBuilder<()> { @@ -143,7 +142,6 @@ impl LlmpEventManagerBuilder { ) -> Result, Error> where SHM: ShMem, - SP: ShMemProvider, { Ok(LlmpEventManager { throttle: self.throttle, @@ -158,8 +156,8 @@ impl LlmpEventManagerBuilder { serializations_cnt: 0, should_serialize_cnt: 0, time_ref, - phantom: PhantomData, event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE), + phantom: PhantomData, }) } @@ -217,11 +215,10 @@ impl LlmpEventManagerBuilder { } #[cfg(feature = "std")] -impl CanSerializeObserver for LlmpEventManager +impl CanSerializeObserver for LlmpEventManager where - OT: Serialize + MatchNameRef, + OT: MatchNameRef + Serialize, SHM: ShMem, - SP: ShMemProvider, { fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> { serialize_observers_adaptive::(self, observers, 2, 80) @@ -231,7 +228,6 @@ where impl AdaptiveSerializer for LlmpEventManager where SHM: ShMem, - SP: ShMemProvider, { fn serialization_time(&self) -> Duration { self.serialization_time @@ -267,7 +263,7 @@ where impl Debug for LlmpEventManager where SHM: ShMem, - SP: ShMemProvider, + SP: Debug, { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { let mut debug_struct = f.debug_struct("LlmpEventManager"); @@ -277,7 +273,6 @@ where let debug = debug.field("compressor", &self.compressor); debug .field("configuration", &self.configuration) - .field("phantom", &self.phantom) .finish_non_exhaustive() } } @@ -285,7 +280,6 @@ where impl Drop for LlmpEventManager where SHM: ShMem, - SP: ShMemProvider, { /// LLMP clients will have to wait until their pages are mapped by somebody. fn drop(&mut self) { @@ -296,7 +290,6 @@ where impl LlmpEventManager where SHM: ShMem, - SP: ShMemProvider, { /// Calling this function will tell the llmp broker that this client is exiting /// This should be called from the restarter not from the actual fuzzer client @@ -330,7 +323,12 @@ where log::debug!("Asking he broker to be disconnected"); Ok(()) } +} +impl LlmpEventManager +where + SHM: ShMem, +{ /// Describe the client event manager's LLMP parts in a restorable fashion pub fn describe(&self) -> Result { self.llmp.describe() @@ -347,7 +345,6 @@ where impl LlmpEventManager where SHM: ShMem, - SP: ShMemProvider, { // Handle arriving events in the client fn handle_in_client( @@ -516,23 +513,26 @@ impl EventRestarter for LlmpEventManager, { fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { std_on_restart(self, state) } } -impl ManagerExit for LlmpEventManager +impl SendExiting for LlmpEventManager where - SHM: ShMem, SHM: ShMem, SP: ShMemProvider, { fn send_exiting(&mut self) -> Result<(), Error> { self.llmp.sender_mut().send_exiting() } +} +impl AwaitRestartSafe for LlmpEventManager +where + SHM: ShMem, +{ /// The LLMP client needs to wait until a broker has mapped all pages before shutting down. /// Otherwise, the OS may already have removed the shared maps. fn await_restart_safe(&mut self) { @@ -621,7 +621,6 @@ where impl HasEventManagerId for LlmpEventManager where SHM: ShMem, - SP: ShMemProvider, { /// Gets the id assigned to this staterestorer. fn mgr_id(&self) -> EventManagerId { diff --git a/libafl/src/events/llmp/restarting.rs b/libafl/src/events/llmp/restarting.rs index bbdc971263..a63d7e86dd 100644 --- a/libafl/src/events/llmp/restarting.rs +++ b/libafl/src/events/llmp/restarting.rs @@ -35,10 +35,10 @@ use crate::{ common::HasMetadata, events::{ launcher::ClientDescription, serialize_observers_adaptive, std_maybe_report_progress, - std_report_progress, AdaptiveSerializer, CanSerializeObserver, Event, EventConfig, - EventFirer, EventManagerHooksTuple, EventManagerId, EventProcessor, EventRestarter, - HasEventManagerId, LlmpEventManager, LlmpShouldSaveState, ManagerExit, ProgressReporter, - StdLlmpEventHook, + std_report_progress, AdaptiveSerializer, AwaitRestartSafe, CanSerializeObserver, Event, + EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, EventProcessor, + EventRestarter, HasEventManagerId, LlmpEventManager, LlmpShouldSaveState, ProgressReporter, + SendExiting, StdLlmpEventHook, }, executors::HasObservers, fuzzer::{EvaluatorObservers, ExecutionProcessor}, @@ -58,7 +58,6 @@ use crate::{ pub struct LlmpRestartingEventManager where SHM: ShMem, - SP: ShMemProvider, { /// The embedded LLMP event manager llmp_mgr: LlmpEventManager, @@ -71,7 +70,6 @@ where impl AdaptiveSerializer for LlmpRestartingEventManager where SHM: ShMem, - SP: ShMemProvider, { fn serialization_time(&self) -> Duration { self.llmp_mgr.serialization_time() @@ -151,9 +149,8 @@ where impl CanSerializeObserver for LlmpRestartingEventManager where - OT: Serialize + MatchNameRef, + OT: MatchNameRef + Serialize, SHM: ShMem, - SP: ShMemProvider, { fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> { serialize_observers_adaptive::(self, observers, 2, 80) @@ -187,7 +184,7 @@ where } } -impl ManagerExit for LlmpRestartingEventManager +impl SendExiting for LlmpRestartingEventManager where SHM: ShMem, SP: ShMemProvider, @@ -198,7 +195,12 @@ where // This way, the broker can clean up the pages, and eventually exit. self.llmp_mgr.send_exiting() } +} +impl AwaitRestartSafe for LlmpRestartingEventManager +where + SHM: ShMem, +{ /// The llmp client needs to wait until a broker mapped all pages, before shutting down. /// Otherwise, the OS may already have removed the shared maps, #[inline] @@ -331,9 +333,9 @@ pub fn setup_restarting_mgr_std( Error, > where + I: DeserializeOwned, MT: Monitor + Clone, S: Serialize + DeserializeOwned, - I: DeserializeOwned, { RestartingMgr::builder() .shmem_provider(StdShMemProvider::new()?) diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index f5672fd9f6..e2db05dc6d 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -564,11 +564,9 @@ pub trait EventRestarter { /// Default implementation of [`EventRestarter::on_restart`] for implementors with the given /// constraints -pub fn std_on_restart( - restarter: &mut (impl EventRestarter + ManagerExit), - state: &mut S, -) -> Result<(), Error> +pub fn std_on_restart(restarter: &mut EM, state: &mut S) -> Result<(), Error> where + EM: EventRestarter + AwaitRestartSafe, S: HasCurrentStageId, { state.on_restart()?; @@ -582,11 +580,15 @@ pub trait CanSerializeObserver { fn serialize_observers(&mut self, observers: &OT) -> Result>, Error>; } -/// Routines called before exiting -pub trait ManagerExit { +/// Send that we're about to exit +pub trait SendExiting { /// Send information that this client is exiting. /// No need to restart us any longer, and no need to print an error, either. fn send_exiting(&mut self) -> Result<(), Error>; +} + +/// Wait until it's safe to restart +pub trait AwaitRestartSafe { /// Block until we are safe to exit, usually called inside `on_restart`. fn await_restart_safe(&mut self); } @@ -640,12 +642,15 @@ where } } -impl ManagerExit for NopEventManager { +impl SendExiting for NopEventManager { /// Send information that this client is exiting. /// No need to restart us any longer, and no need to print an error, either. fn send_exiting(&mut self) -> Result<(), Error> { Ok(()) } +} + +impl AwaitRestartSafe for NopEventManager { /// Block until we are safe to exit, usually called inside `on_restart`. fn await_restart_safe(&mut self) {} } @@ -761,15 +766,20 @@ where } } -impl ManagerExit for MonitorTypedEventManager +impl SendExiting for MonitorTypedEventManager where - EM: ManagerExit, + EM: SendExiting, { #[inline] fn send_exiting(&mut self) -> Result<(), Error> { self.inner.send_exiting() } +} +impl AwaitRestartSafe for MonitorTypedEventManager +where + EM: AwaitRestartSafe, +{ #[inline] fn await_restart_safe(&mut self) { self.inner.await_restart_safe(); diff --git a/libafl/src/events/simple.rs b/libafl/src/events/simple.rs index 35e2cdb8de..c9dcbf0eb1 100644 --- a/libafl/src/events/simple.rs +++ b/libafl/src/events/simple.rs @@ -22,14 +22,14 @@ use libafl_bolts::{ use serde::de::DeserializeOwned; use serde::Serialize; -use super::{std_on_restart, ProgressReporter}; +use super::{std_on_restart, AwaitRestartSafe, ProgressReporter}; #[cfg(all(unix, feature = "std", not(miri)))] use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::{ events::{ std_maybe_report_progress, std_report_progress, BrokerEventResult, CanSerializeObserver, Event, EventFirer, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, - ManagerExit, + SendExiting, }, monitors::Monitor, stages::HasCurrentStageId, @@ -90,11 +90,13 @@ where } } -impl ManagerExit for SimpleEventManager { +impl SendExiting for SimpleEventManager { fn send_exiting(&mut self) -> Result<(), Error> { Ok(()) } +} +impl AwaitRestartSafe for SimpleEventManager { fn await_restart_safe(&mut self) {} } @@ -341,7 +343,7 @@ where } #[cfg(feature = "std")] -impl ManagerExit for SimpleRestartingEventManager +impl SendExiting for SimpleRestartingEventManager where SHM: ShMem, SP: ShMemProvider, @@ -350,6 +352,10 @@ where self.staterestorer.send_exiting(); Ok(()) } +} + +#[cfg(feature = "std")] +impl AwaitRestartSafe for SimpleRestartingEventManager { /// Block until we are safe to exit, usually called inside `on_restart`. #[inline] fn await_restart_safe(&mut self) {} diff --git a/libafl/src/events/tcp.rs b/libafl/src/events/tcp.rs index fb53d7bd87..5dc36b5a7d 100644 --- a/libafl/src/events/tcp.rs +++ b/libafl/src/events/tcp.rs @@ -38,7 +38,7 @@ use tokio::{ }; use typed_builder::TypedBuilder; -use super::{std_maybe_report_progress, std_report_progress, ManagerExit}; +use super::{std_maybe_report_progress, std_report_progress, AwaitRestartSafe, SendExiting}; #[cfg(feature = "share_objectives")] use crate::corpus::{Corpus, Testcase}; #[cfg(all(unix, not(miri)))] @@ -776,14 +776,16 @@ where } } -impl ManagerExit for TcpEventManager { +impl AwaitRestartSafe for TcpEventManager { /// The TCP client needs to wait until a broker has mapped all pages before shutting down. /// Otherwise, the OS may already have removed the shared maps. fn await_restart_safe(&mut self) { // wait until we can drop the message safely. //self.tcp.await_safe_to_unmap_blocking(); } +} +impl SendExiting for TcpEventManager { fn send_exiting(&mut self) -> Result<(), Error> { //TODO: Should not be needed since TCP does that for us //self.tcp.sender.send_exiting() @@ -866,7 +868,7 @@ where } } -impl ManagerExit for TcpRestartingEventManager +impl SendExiting for TcpRestartingEventManager where SHM: ShMem, SP: ShMemProvider, @@ -877,7 +879,12 @@ where // This way, the broker can clean up the pages, and eventually exit. self.tcp_mgr.send_exiting() } +} +impl AwaitRestartSafe for TcpRestartingEventManager +where + SHM: ShMem, +{ /// The tcp client needs to wait until a broker mapped all pages, before shutting down. /// Otherwise, the OS may already have removed the shared maps, #[inline] diff --git a/libafl_bolts/src/llmp.rs b/libafl_bolts/src/llmp.rs index c820fda748..bc64a0a515 100644 --- a/libafl_bolts/src/llmp.rs +++ b/libafl_bolts/src/llmp.rs @@ -950,47 +950,6 @@ where }) } - /// ID of this sender. - #[must_use] - pub fn id(&self) -> ClientId { - self.id - } - - /// Completely reset the current sender map. - /// Afterwards, no receiver should read from it at a different location. - /// This is only useful if all connected llmp parties start over, for example after a crash. - /// - /// # Safety - /// Only safe if you really really restart the page on everything connected - /// No receiver should read from this page at a different location. - pub unsafe fn reset(&mut self) { - llmp_page_init( - &mut self.out_shmems.last_mut().unwrap().shmem, - self.id, - true, - ); - self.last_msg_sent = ptr::null_mut(); - } - - /// Reads the stored sender / client id for the given `env_name` (by appending `_CLIENT_ID`). - /// If the content of the env is `_NULL`, returns [`Option::None`]. - #[cfg(feature = "std")] - #[inline] - fn client_id_from_env(env_name: &str) -> Result, Error> { - let client_id_str = env::var(format!("{env_name}_CLIENT_ID"))?; - Ok(if client_id_str == _NULL_ENV_STR { - None - } else { - Some(ClientId(client_id_str.parse()?)) - }) - } - - /// Writes the `id` to an env var - #[cfg(feature = "std")] - fn client_id_to_env(env_name: &str, id: ClientId) { - env::set_var(format!("{env_name}_CLIENT_ID"), format!("{}", id.0)); - } - /// Reattach to a vacant `out_shmem`, to with a previous sender stored the information in an env before. #[cfg(feature = "std")] pub fn on_existing_from_env(mut shmem_provider: SP, env_name: &str) -> Result { @@ -1010,56 +969,6 @@ where Ok(ret) } - /// Store the info to this sender to env. - /// A new client can reattach to it using [`LlmpSender::on_existing_from_env()`]. - #[cfg(feature = "std")] - pub fn to_env(&self, env_name: &str) -> Result<(), Error> { - let current_out_shmem = self.out_shmems.last().unwrap(); - current_out_shmem.shmem.write_to_env(env_name)?; - Self::client_id_to_env(env_name, self.id); - unsafe { current_out_shmem.msg_to_env(self.last_msg_sent, env_name) } - } - - /// Waits for this sender to be save to unmap. - /// If a receiver is involved, this function should always be called. - pub fn await_safe_to_unmap_blocking(&self) { - #[cfg(feature = "std")] - let mut ctr = 0_u16; - loop { - if self.safe_to_unmap() { - return; - } - hint::spin_loop(); - // We log that we're looping -> see when we're blocking. - #[cfg(feature = "std")] - { - ctr = ctr.wrapping_add(1); - if ctr == 0 { - log::info!("Awaiting safe_to_unmap_blocking"); - } - } - } - } - - /// If we are allowed to unmap this client - pub fn safe_to_unmap(&self) -> bool { - let current_out_shmem = self.out_shmems.last().unwrap(); - unsafe { - // log::info!("Reading safe_to_unmap from {:?}", current_out_shmem.page() as *const _); - (*current_out_shmem.page()) - .receivers_joined_count - .load(Ordering::Relaxed) - >= 1 - } - } - - /// For debug purposes: Mark save to unmap, even though it might not have been read by a receiver yet. - /// # Safety - /// If this method is called, the page may be unmapped before it is read by any receiver. - pub unsafe fn mark_safe_to_unmap(&mut self) { - (*self.out_shmems.last_mut().unwrap().page_mut()).receiver_joined(); - } - /// Reattach to a vacant `out_shmem`. /// It is essential, that the receiver (or someone else) keeps a pointer to this map /// else reattach will get a new, empty page, from the OS, or fail. @@ -1135,169 +1044,6 @@ where } } - /// Intern: Special allocation function for `EOP` messages (and nothing else!) - /// The normal alloc will fail if there is not enough space for `buf_len_padded + EOP` - /// So if [`alloc_next`] fails, create new page if necessary, use this function, - /// place `EOP`, commit `EOP`, reset, alloc again on the new space. - unsafe fn alloc_eop(&mut self) -> Result<*mut LlmpMsg, Error> { - let map = self.out_shmems.last_mut().unwrap(); - let page = map.page_mut(); - let last_msg = self.last_msg_sent; - assert!((*page).size_used + EOP_MSG_SIZE <= (*page).size_total, - "PROGRAM ABORT : BUG: EOP does not fit in page! page {page:?}, size_current {:?}, size_total {:?}", - &raw const (*page).size_used, &raw const (*page).size_total); - - let ret: *mut LlmpMsg = if last_msg.is_null() { - (*page).messages.as_mut_ptr() - } else { - llmp_next_msg_ptr_checked(map, last_msg, EOP_MSG_SIZE)? - }; - assert!( - (*ret).tag != LLMP_TAG_UNINITIALIZED, - "Did not call send() on last message!" - ); - - (*ret).buf_len = size_of::() as u64; - - // We don't need to pad the EOP message: it'll always be the last in this page. - (*ret).buf_len_padded = (*ret).buf_len; - (*ret).message_id = if last_msg.is_null() { - MessageId(1) - } else { - MessageId((*last_msg).message_id.0 + 1) - }; - (*ret).tag = LLMP_TAG_END_OF_PAGE; - (*page).size_used += EOP_MSG_SIZE; - Ok(ret) - } - - /// Intern: Will return a ptr to the next msg buf, or None if map is full. - /// Never call [`alloc_next`] without either sending or cancelling the last allocated message for this page! - /// There can only ever be up to one message allocated per page at each given time. - unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> { - let map = self.out_shmems.last_mut().unwrap(); - let page = map.page_mut(); - let last_msg = self.last_msg_sent; - - assert!( - !self.has_unsent_message, - "Called alloc without calling send inbetween" - ); - - #[cfg(feature = "llmp_debug")] - log::info!( - "Allocating {} bytes on page {:?} / map {:?} (last msg: {:?})", - buf_len, - page, - &map.shmem.id().as_str(), - last_msg - ); - - let msg_start = (*page).messages.as_mut_ptr() as usize + (*page).size_used; - - // Make sure the end of our msg is aligned. - let buf_len_padded = llmp_align(msg_start + buf_len + size_of::()) - - msg_start - - size_of::(); - - #[cfg(feature = "llmp_debug")] - log::trace!( - "{page:?} {:?} size_used={:x} buf_len_padded={:x} EOP_MSG_SIZE={:x} size_total={}", - &(*page), - (*page).size_used, - buf_len_padded, - EOP_MSG_SIZE, - (*page).size_total - ); - - // For future allocs, keep track of the maximum (aligned) alloc size we used - (*page).max_alloc_size = max( - (*page).max_alloc_size, - size_of::() + buf_len_padded, - ); - - // We need enough space for the current page size_used + payload + padding - if (*page).size_used + size_of::() + buf_len_padded + EOP_MSG_SIZE - > (*page).size_total - { - #[cfg(feature = "llmp_debug")] - log::info!("LLMP: Page full."); - - /* We're full. */ - return None; - } - - let ret = msg_start as *mut LlmpMsg; - - /* We need to start with 1 for ids, as current message id is initialized - * with 0... */ - (*ret).message_id = if last_msg.is_null() { - MessageId(1) - } else if (*page).current_msg_id.load(Ordering::Relaxed) == (*last_msg).message_id.0 { - MessageId((*last_msg).message_id.0 + 1) - } else { - /* Oops, wrong usage! */ - panic!("BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {:?})", &raw const (*page).current_msg_id, (*last_msg).message_id); - }; - - (*ret).buf_len = buf_len as u64; - (*ret).buf_len_padded = buf_len_padded as u64; - (*page).size_used += size_of::() + buf_len_padded; - - (*llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET; - (*ret).tag = LLMP_TAG_UNINITIALIZED; - - self.has_unsent_message = true; - - Some(ret) - } - - /// Commit the message last allocated by [`alloc_next`] to the queue. - /// After commiting, the msg shall no longer be altered! - /// It will be read by the consuming threads (`broker->clients` or `client->broker`) - /// If `overwrite_client_id` is `false`, the message's `sender` won't be touched (for broker forwarding) - #[inline(never)] // Not inlined to make cpu-level reodering (hopefully?) improbable - unsafe fn send(&mut self, msg: *mut LlmpMsg, overwrite_client_id: bool) -> Result<(), Error> { - // log::info!("Sending msg {:?}", msg); - - assert!(self.last_msg_sent != msg, "Message sent twice!"); - assert!( - (*msg).tag != LLMP_TAG_UNSET, - "No tag set on message with id {:?}", - (*msg).message_id - ); - // A client gets the sender id assigned to by the broker during the initial handshake. - if overwrite_client_id { - (*msg).sender = self.id; - } - let page = self.out_shmems.last_mut().unwrap().page_mut(); - if msg.is_null() || !llmp_msg_in_page(page, msg) { - return Err(Error::unknown(format!( - "Llmp Message {msg:?} is null or not in current page" - ))); - } - - let mid = (*page).current_msg_id.load(Ordering::Relaxed) + 1; - (*msg).message_id.0 = mid; - - // Make sure all things have been written to the page, and commit the message to the page - (*page) - .current_msg_id - .store((*msg).message_id.0, Ordering::Release); - - self.last_msg_sent = msg; - self.has_unsent_message = false; - - log::debug!( - "[{} - {:#x}] Send message with id {}", - self.id.0, - ptr::from_ref::(self) as u64, - mid - ); - - Ok(()) - } - /// Grab an unused `LlmpSharedMap` from `unused_shmem_cache` or allocate a new map, /// if no suitable maps could be found. unsafe fn new_or_unused_shmem( @@ -1546,20 +1292,6 @@ where } } - /// Describe this [`LlmpClient`] in a way that it can be restored later, using [`Self::on_existing_from_description`]. - pub fn describe(&self) -> Result { - let map = self.out_shmems.last().unwrap(); - let last_message_offset = if self.last_msg_sent.is_null() { - None - } else { - Some(unsafe { map.msg_to_offset(self.last_msg_sent) }?) - }; - Ok(LlmpDescription { - shmem: map.shmem.description(), - last_message_offset, - }) - } - /// Create this client on an existing map from the given description. /// Acquired with [`self.describe`]. pub fn on_existing_from_description( @@ -1581,6 +1313,279 @@ where } } +impl LlmpSender +where + SHM: ShMem, +{ + /// ID of this sender. + #[must_use] + pub fn id(&self) -> ClientId { + self.id + } + + /// Completely reset the current sender map. + /// Afterwards, no receiver should read from it at a different location. + /// This is only useful if all connected llmp parties start over, for example after a crash. + /// + /// # Safety + /// Only safe if you really really restart the page on everything connected + /// No receiver should read from this page at a different location. + pub unsafe fn reset(&mut self) { + llmp_page_init( + &mut self.out_shmems.last_mut().unwrap().shmem, + self.id, + true, + ); + self.last_msg_sent = ptr::null_mut(); + } + + /// Reads the stored sender / client id for the given `env_name` (by appending `_CLIENT_ID`). + /// If the content of the env is `_NULL`, returns [`Option::None`]. + #[cfg(feature = "std")] + #[inline] + fn client_id_from_env(env_name: &str) -> Result, Error> { + let client_id_str = env::var(format!("{env_name}_CLIENT_ID"))?; + Ok(if client_id_str == _NULL_ENV_STR { + None + } else { + Some(ClientId(client_id_str.parse()?)) + }) + } + + /// Writes the `id` to an env var + #[cfg(feature = "std")] + fn client_id_to_env(env_name: &str, id: ClientId) { + env::set_var(format!("{env_name}_CLIENT_ID"), format!("{}", id.0)); + } + + /// Store the info to this sender to env. + /// A new client can reattach to it using [`LlmpSender::on_existing_from_env()`]. + #[cfg(feature = "std")] + pub fn to_env(&self, env_name: &str) -> Result<(), Error> { + let current_out_shmem = self.out_shmems.last().unwrap(); + current_out_shmem.shmem.write_to_env(env_name)?; + Self::client_id_to_env(env_name, self.id); + unsafe { current_out_shmem.msg_to_env(self.last_msg_sent, env_name) } + } + + /// Waits for this sender to be save to unmap. + /// If a receiver is involved, this function should always be called. + pub fn await_safe_to_unmap_blocking(&self) { + #[cfg(feature = "std")] + let mut ctr = 0_u16; + loop { + if self.safe_to_unmap() { + return; + } + hint::spin_loop(); + // We log that we're looping -> see when we're blocking. + #[cfg(feature = "std")] + { + ctr = ctr.wrapping_add(1); + if ctr == 0 { + log::info!("Awaiting safe_to_unmap_blocking"); + } + } + } + } + + /// If we are allowed to unmap this client + pub fn safe_to_unmap(&self) -> bool { + let current_out_shmem = self.out_shmems.last().unwrap(); + unsafe { + // log::info!("Reading safe_to_unmap from {:?}", current_out_shmem.page() as *const _); + (*current_out_shmem.page()) + .receivers_joined_count + .load(Ordering::Relaxed) + >= 1 + } + } + + /// For debug purposes: Mark save to unmap, even though it might not have been read by a receiver yet. + /// # Safety + /// If this method is called, the page may be unmapped before it is read by any receiver. + pub unsafe fn mark_safe_to_unmap(&mut self) { + (*self.out_shmems.last_mut().unwrap().page_mut()).receiver_joined(); + } + + /// Intern: Special allocation function for `EOP` messages (and nothing else!) + /// The normal alloc will fail if there is not enough space for `buf_len_padded + EOP` + /// So if [`alloc_next`] fails, create new page if necessary, use this function, + /// place `EOP`, commit `EOP`, reset, alloc again on the new space. + unsafe fn alloc_eop(&mut self) -> Result<*mut LlmpMsg, Error> { + let map = self.out_shmems.last_mut().unwrap(); + let page = map.page_mut(); + let last_msg = self.last_msg_sent; + assert!((*page).size_used + EOP_MSG_SIZE <= (*page).size_total, + "PROGRAM ABORT : BUG: EOP does not fit in page! page {page:?}, size_current {:?}, size_total {:?}", + &raw const (*page).size_used, &raw const (*page).size_total); + + let ret: *mut LlmpMsg = if last_msg.is_null() { + (*page).messages.as_mut_ptr() + } else { + llmp_next_msg_ptr_checked(map, last_msg, EOP_MSG_SIZE)? + }; + assert!( + (*ret).tag != LLMP_TAG_UNINITIALIZED, + "Did not call send() on last message!" + ); + + (*ret).buf_len = size_of::() as u64; + + // We don't need to pad the EOP message: it'll always be the last in this page. + (*ret).buf_len_padded = (*ret).buf_len; + (*ret).message_id = if last_msg.is_null() { + MessageId(1) + } else { + MessageId((*last_msg).message_id.0 + 1) + }; + (*ret).tag = LLMP_TAG_END_OF_PAGE; + (*page).size_used += EOP_MSG_SIZE; + Ok(ret) + } + + /// Intern: Will return a ptr to the next msg buf, or None if map is full. + /// Never call [`alloc_next`] without either sending or cancelling the last allocated message for this page! + /// There can only ever be up to one message allocated per page at each given time. + unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> { + let map = self.out_shmems.last_mut().unwrap(); + let page = map.page_mut(); + let last_msg = self.last_msg_sent; + + assert!( + !self.has_unsent_message, + "Called alloc without calling send inbetween" + ); + + #[cfg(feature = "llmp_debug")] + log::info!( + "Allocating {} bytes on page {:?} / map {:?} (last msg: {:?})", + buf_len, + page, + &map.shmem.id().as_str(), + last_msg + ); + + let msg_start = (*page).messages.as_mut_ptr() as usize + (*page).size_used; + + // Make sure the end of our msg is aligned. + let buf_len_padded = llmp_align(msg_start + buf_len + size_of::()) + - msg_start + - size_of::(); + + #[cfg(feature = "llmp_debug")] + log::trace!( + "{page:?} {:?} size_used={:x} buf_len_padded={:x} EOP_MSG_SIZE={:x} size_total={}", + &(*page), + (*page).size_used, + buf_len_padded, + EOP_MSG_SIZE, + (*page).size_total + ); + + // For future allocs, keep track of the maximum (aligned) alloc size we used + (*page).max_alloc_size = max( + (*page).max_alloc_size, + size_of::() + buf_len_padded, + ); + + // We need enough space for the current page size_used + payload + padding + if (*page).size_used + size_of::() + buf_len_padded + EOP_MSG_SIZE + > (*page).size_total + { + #[cfg(feature = "llmp_debug")] + log::info!("LLMP: Page full."); + + /* We're full. */ + return None; + } + + let ret = msg_start as *mut LlmpMsg; + + /* We need to start with 1 for ids, as current message id is initialized + * with 0... */ + (*ret).message_id = if last_msg.is_null() { + MessageId(1) + } else if (*page).current_msg_id.load(Ordering::Relaxed) == (*last_msg).message_id.0 { + MessageId((*last_msg).message_id.0 + 1) + } else { + /* Oops, wrong usage! */ + panic!("BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {:?})", &raw const (*page).current_msg_id, (*last_msg).message_id); + }; + + (*ret).buf_len = buf_len as u64; + (*ret).buf_len_padded = buf_len_padded as u64; + (*page).size_used += size_of::() + buf_len_padded; + + (*llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET; + (*ret).tag = LLMP_TAG_UNINITIALIZED; + + self.has_unsent_message = true; + + Some(ret) + } + + /// Commit the message last allocated by [`alloc_next`] to the queue. + /// After commiting, the msg shall no longer be altered! + /// It will be read by the consuming threads (`broker->clients` or `client->broker`) + /// If `overwrite_client_id` is `false`, the message's `sender` won't be touched (for broker forwarding) + #[inline(never)] // Not inlined to make cpu-level reodering (hopefully?) improbable + unsafe fn send(&mut self, msg: *mut LlmpMsg, overwrite_client_id: bool) -> Result<(), Error> { + // log::info!("Sending msg {:?}", msg); + + assert!(self.last_msg_sent != msg, "Message sent twice!"); + assert!( + (*msg).tag != LLMP_TAG_UNSET, + "No tag set on message with id {:?}", + (*msg).message_id + ); + // A client gets the sender id assigned to by the broker during the initial handshake. + if overwrite_client_id { + (*msg).sender = self.id; + } + let page = self.out_shmems.last_mut().unwrap().page_mut(); + if msg.is_null() || !llmp_msg_in_page(page, msg) { + return Err(Error::unknown(format!( + "Llmp Message {msg:?} is null or not in current page" + ))); + } + + let mid = (*page).current_msg_id.load(Ordering::Relaxed) + 1; + (*msg).message_id.0 = mid; + + // Make sure all things have been written to the page, and commit the message to the page + (*page) + .current_msg_id + .store((*msg).message_id.0, Ordering::Release); + + self.last_msg_sent = msg; + self.has_unsent_message = false; + + log::debug!( + "[{} - {:#x}] Send message with id {}", + self.id.0, + ptr::from_ref::(self) as u64, + mid + ); + + Ok(()) + } + + /// Describe this [`LlmpClient`] in a way that it can be restored later, using [`Self::on_existing_from_description`]. + pub fn describe(&self) -> Result { + let map = self.out_shmems.last().unwrap(); + let last_message_offset = if self.last_msg_sent.is_null() { + None + } else { + Some(unsafe { map.msg_to_offset(self.last_msg_sent) }?) + }; + Ok(LlmpDescription { + shmem: map.shmem.description(), + last_message_offset, + }) + } +} + /// Receiving end on a (unidirectional) sharedmap channel #[derive(Debug)] pub struct LlmpReceiver { @@ -1615,15 +1620,6 @@ where ) } - /// Store the info to this receiver to env. - /// A new client can reattach to it using [`LlmpReceiver::on_existing_from_env()`] - #[cfg(feature = "std")] - pub fn to_env(&self, env_name: &str) -> Result<(), Error> { - let current_out_shmem = &self.current_recv_shmem; - current_out_shmem.shmem.write_to_env(env_name)?; - unsafe { current_out_shmem.msg_to_env(self.last_msg_recvd, env_name) } - } - /// Create a Receiver, reattaching to an existing sender map. /// It is essential, that the sender (or someone else) keeps a pointer to the `sender_shmem` /// else reattach will get a new, empty page, from the OS, or fail. @@ -1863,6 +1859,33 @@ where } } + /// Create this client on an existing map from the given description. acquired with `self.describe` + pub fn on_existing_from_description( + mut shmem_provider: SP, + description: &LlmpDescription, + ) -> Result { + Self::on_existing_shmem( + shmem_provider.clone(), + shmem_provider.shmem_from_description(description.shmem)?, + description.last_message_offset, + ) + } +} + +/// Receiving end of an llmp channel +impl LlmpReceiver +where + SHM: ShMem, +{ + /// Store the info to this receiver to env. + /// A new client can reattach to it using [`LlmpReceiver::on_existing_from_env()`] + #[cfg(feature = "std")] + pub fn to_env(&self, env_name: &str) -> Result<(), Error> { + let current_out_shmem = &self.current_recv_shmem; + current_out_shmem.shmem.write_to_env(env_name)?; + unsafe { current_out_shmem.msg_to_env(self.last_msg_recvd, env_name) } + } + /// Describe this client in a way, that it can be restored later with [`Self::on_existing_from_description`] pub fn describe(&self) -> Result { let map = &self.current_recv_shmem; @@ -1876,18 +1899,6 @@ where last_message_offset, }) } - - /// Create this client on an existing map from the given description. acquired with `self.describe` - pub fn on_existing_from_description( - mut shmem_provider: SP, - description: &LlmpDescription, - ) -> Result { - Self::on_existing_shmem( - shmem_provider.clone(), - shmem_provider.shmem_from_description(description.shmem)?, - description.last_message_offset, - ) - } } /// A page wrapper @@ -3441,6 +3452,50 @@ where SHM: ShMem, SP: ShMemProvider, { + /// Creates a new [`LlmpClient`] + pub fn new( + mut shmem_provider: SP, + initial_broker_shmem: LlmpSharedMap, + sender_id: ClientId, + ) -> Result { + Ok(Self { + sender: LlmpSender { + id: sender_id, + last_msg_sent: ptr::null_mut(), + out_shmems: vec![LlmpSharedMap::new(sender_id, { + shmem_provider.new_shmem(LLMP_CFG_INITIAL_MAP_SIZE)? + })], + // drop pages to the broker if it already read them + keep_pages_forever: false, + has_unsent_message: false, + shmem_provider: shmem_provider.clone(), + unused_shmem_cache: vec![], + }, + + receiver: LlmpReceiver { + id: ClientId(0), + current_recv_shmem: initial_broker_shmem, + last_msg_recvd: ptr::null_mut(), + shmem_provider, + highest_msg_id: MessageId(0), + // We don't know the last received time, just assume the current time. + #[cfg(feature = "std")] + last_msg_time: current_time(), + }, + }) + } + + /// Create a point-to-point channel instead of using a broker-client channel + pub fn new_p2p(shmem_provider: SP, sender_id: ClientId) -> Result { + let sender = LlmpSender::new(shmem_provider.clone(), sender_id, false)?; + let receiver = LlmpReceiver::on_existing_shmem( + shmem_provider, + sender.out_shmems[0].shmem.clone(), + None, + )?; + Ok(Self { sender, receiver }) + } + /// Reattach to a vacant client map. /// It is essential, that the broker (or someone else) kept a pointer to the `out_shmem` /// else reattach will get a new, empty page, from the OS, or fail @@ -3481,22 +3536,6 @@ where }) } - /// Write the current state to env. - /// A new client can attach to exactly the same state by calling [`LlmpClient::on_existing_shmem()`]. - #[cfg(feature = "std")] - pub fn to_env(&self, env_name: &str) -> Result<(), Error> { - self.sender.to_env(&format!("{env_name}_SENDER"))?; - self.receiver.to_env(&format!("{env_name}_RECEIVER")) - } - - /// Describe this client in a way that it can be recreated, for example after crash - pub fn describe(&self) -> Result { - Ok(LlmpClientDescription { - sender: self.sender.describe()?, - receiver: self.receiver.describe()?, - }) - } - /// Create an existing client from description pub fn existing_client_from_description( shmem_provider: SP, @@ -3514,102 +3553,6 @@ where }) } - /// Outgoing channel to the broker - #[must_use] - pub fn sender(&self) -> &LlmpSender { - &self.sender - } - - /// Outgoing channel to the broker (mut) - #[must_use] - pub fn sender_mut(&mut self) -> &mut LlmpSender { - &mut self.sender - } - - /// Incoming (broker) broadcast map - #[must_use] - pub fn receiver(&self) -> &LlmpReceiver { - &self.receiver - } - - /// Incoming (broker) broadcast map (mut) - #[must_use] - pub fn receiver_mut(&mut self) -> &mut LlmpReceiver { - &mut self.receiver - } - - /// Waits for the sender to be save to unmap. - /// If a receiver is involved on the other side, this function should always be called. - pub fn await_safe_to_unmap_blocking(&self) { - self.sender.await_safe_to_unmap_blocking(); - } - - /// If we are allowed to unmap this client - pub fn safe_to_unmap(&self) -> bool { - self.sender.safe_to_unmap() - } - - /// For debug purposes: mark the client as save to unmap, even though it might not have been read. - /// - /// # Safety - /// This should only be called in a debug scenario. - /// Calling this in other contexts may lead to a premature page unmap and result in a crash in another process, - /// or an unexpected read from an empty page in a receiving process. - pub unsafe fn mark_safe_to_unmap(&mut self) { - self.sender.mark_safe_to_unmap(); - } - - /// Creates a new [`LlmpClient`] - pub fn new( - mut shmem_provider: SP, - initial_broker_shmem: LlmpSharedMap, - sender_id: ClientId, - ) -> Result { - Ok(Self { - sender: LlmpSender { - id: sender_id, - last_msg_sent: ptr::null_mut(), - out_shmems: vec![LlmpSharedMap::new(sender_id, { - shmem_provider.new_shmem(LLMP_CFG_INITIAL_MAP_SIZE)? - })], - // drop pages to the broker if it already read them - keep_pages_forever: false, - has_unsent_message: false, - shmem_provider: shmem_provider.clone(), - unused_shmem_cache: vec![], - }, - - receiver: LlmpReceiver { - id: ClientId(0), - current_recv_shmem: initial_broker_shmem, - last_msg_recvd: ptr::null_mut(), - shmem_provider, - highest_msg_id: MessageId(0), - // We don't know the last received time, just assume the current time. - #[cfg(feature = "std")] - last_msg_time: current_time(), - }, - }) - } - - /// Create a point-to-point channel instead of using a broker-client channel - pub fn new_p2p(shmem_provider: SP, sender_id: ClientId) -> Result { - let sender = LlmpSender::new(shmem_provider.clone(), sender_id, false)?; - let receiver = LlmpReceiver::on_existing_shmem( - shmem_provider, - sender.out_shmems[0].shmem.clone(), - None, - )?; - Ok(Self { sender, receiver }) - } - - /// Commits a msg to the client's out map - /// # Safety - /// Needs to be called with a proper msg pointer - pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> { - self.sender.send(msg, true) - } - /// Allocates a message of the given size, tags it, and sends it off. pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> { self.sender.send_buf(tag, buf) @@ -3751,6 +3694,81 @@ where } } +impl LlmpClient +where + SHM: ShMem, +{ + /// Waits for the sender to be save to unmap. + /// If a receiver is involved on the other side, this function should always be called. + pub fn await_safe_to_unmap_blocking(&self) { + self.sender.await_safe_to_unmap_blocking(); + } + + /// If we are allowed to unmap this client + pub fn safe_to_unmap(&self) -> bool { + self.sender.safe_to_unmap() + } + + /// For debug purposes: mark the client as save to unmap, even though it might not have been read. + /// + /// # Safety + /// This should only be called in a debug scenario. + /// Calling this in other contexts may lead to a premature page unmap and result in a crash in another process, + /// or an unexpected read from an empty page in a receiving process. + pub unsafe fn mark_safe_to_unmap(&mut self) { + self.sender.mark_safe_to_unmap(); + } + + /// Commits a msg to the client's out map + /// # Safety + /// Needs to be called with a proper msg pointer + pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> { + self.sender.send(msg, true) + } + + /// Write the current state to env. + /// A new client can attach to exactly the same state by calling [`LlmpClient::on_existing_shmem()`]. + #[cfg(feature = "std")] + pub fn to_env(&self, env_name: &str) -> Result<(), Error> { + self.sender.to_env(&format!("{env_name}_SENDER"))?; + self.receiver.to_env(&format!("{env_name}_RECEIVER")) + } + + /// Describe this client in a way that it can be recreated, for example after crash + pub fn describe(&self) -> Result { + Ok(LlmpClientDescription { + sender: self.sender.describe()?, + receiver: self.receiver.describe()?, + }) + } +} + +impl LlmpClient { + /// Outgoing channel to the broker + #[must_use] + pub fn sender(&self) -> &LlmpSender { + &self.sender + } + + /// Outgoing channel to the broker (mut) + #[must_use] + pub fn sender_mut(&mut self) -> &mut LlmpSender { + &mut self.sender + } + + /// Incoming (broker) broadcast map + #[must_use] + pub fn receiver(&self) -> &LlmpReceiver { + &self.receiver + } + + /// Incoming (broker) broadcast map (mut) + #[must_use] + pub fn receiver_mut(&mut self) -> &mut LlmpReceiver { + &mut self.receiver + } +} + #[cfg(test)] #[cfg(all(unix, feature = "std", not(target_os = "haiku")))] mod tests { diff --git a/libafl_libfuzzer/runtime/src/merge.rs b/libafl_libfuzzer/runtime/src/merge.rs index d7917a6039..16a6f190c8 100644 --- a/libafl_libfuzzer/runtime/src/merge.rs +++ b/libafl_libfuzzer/runtime/src/merge.rs @@ -9,7 +9,7 @@ use std::{ use libafl::{ corpus::Corpus, - events::{ManagerExit, SimpleRestartingEventManager}, + events::{SendExiting, SimpleRestartingEventManager}, executors::{ExitKind, InProcessExecutor}, feedback_and_fast, feedback_or_fast, feedbacks::{CrashFeedback, MinMapFeedback, TimeoutFeedback},