Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: signal blob implementation #9326

Merged
merged 70 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
b7510a0
sched: add coroutine support to timer callback
edsiper Sep 16, 2024
1c93041
input_event: add support for Blob signal
edsiper Sep 16, 2024
67d3296
input_chunk: register chunks blob type
edsiper Sep 16, 2024
2404460
input: use FLB_INLINE macro
edsiper Sep 16, 2024
4996a26
engine_macros: adjust scheduler macros
edsiper Sep 16, 2024
19618c3
event: map event type blobs as chunk type blobs
edsiper Sep 16, 2024
86d325b
output_thread: add plugin callback for worker initialization
edsiper Sep 16, 2024
bee712b
utils: add new function to read files by offset
edsiper Sep 16, 2024
cefd067
router: allow routing for blob input types
edsiper Sep 16, 2024
9cdc593
sqldb: commit before closing handler
edsiper Sep 16, 2024
332988f
output: register output blob
edsiper Sep 16, 2024
63c8b8b
input_blob: add new interface to register blobs
edsiper Sep 16, 2024
6b831ae
input_chunk: event type blob support
edsiper Sep 16, 2024
6901e6f
in_blob: new plugin to support large binary files
edsiper Sep 16, 2024
4a99c4c
out_azure_blob: add support to upload large binary files
edsiper Sep 16, 2024
7bd6bd1
build: register in_blob
edsiper Sep 16, 2024
e7defa8
out_stdout: support blob type (print targe file name)
edsiper Sep 16, 2024
84c84a7
scheduler: fixed typo
leonardo-albertovich Sep 17, 2024
38e7ff0
scheduler: fixed typo
leonardo-albertovich Sep 17, 2024
aa6717e
http_client: added DELETE support
leonardo-albertovich Sep 20, 2024
f7b7b6c
input_blob: added source tracking field
leonardo-albertovich Sep 20, 2024
553bf3f
out_azure_blob: added blob delete function and both file and part retry
leonardo-albertovich Sep 20, 2024
53a72f6
out_azure_blob: added remote configuration retrieval functionality
leonardo-albertovich Sep 20, 2024
997aa81
out_azure_blob: added retry and source tracking related functionality
leonardo-albertovich Sep 20, 2024
78adc86
in_blob: added fluentd style recursive file discovery
leonardo-albertovich Sep 21, 2024
ee429c7
config: added plugin notification channel
leonardo-albertovich Sep 21, 2024
5d98dc5
engine_macros: added plugin notification event type
leonardo-albertovich Sep 21, 2024
2141b5b
filter: added plugin notification feature
leonardo-albertovich Sep 21, 2024
dbd0e9b
input: added plugin notification feature
leonardo-albertovich Sep 21, 2024
c489269
input: added plugin notification feature
leonardo-albertovich Sep 21, 2024
5ec1810
output: added plugin notification feature
leonardo-albertovich Sep 21, 2024
c812d4f
input: added plugin notification feature
leonardo-albertovich Sep 21, 2024
6e7e1ca
processor: added plugin notification feature
leonardo-albertovich Sep 21, 2024
594fe47
build: added plugin notification feature
leonardo-albertovich Sep 21, 2024
d08ce1f
engine: added plugin notification feature
leonardo-albertovich Sep 21, 2024
689883b
input_blob: added delivery notification structure and cleanup function
leonardo-albertovich Sep 21, 2024
aee897e
in_blob: added notification handler
leonardo-albertovich Sep 21, 2024
111a45e
out_azure_blob: added delivery notification feature
leonardo-albertovich Sep 21, 2024
249fa6e
notification: initial commit of the notification feature
leonardo-albertovich Sep 21, 2024
a428eac
notification: added notification type field and global list
leonardo-albertovich Sep 22, 2024
9802a51
in_blob: added file upload result notification handling
leonardo-albertovich Sep 22, 2024
bf59de9
input_blob: fixed erroneous file size unpacking
leonardo-albertovich Sep 22, 2024
4a94c2d
out_azure_blob: fixed stored erroneous procedure references and code
leonardo-albertovich Sep 22, 2024
c74cfe4
out_azure_blob: fixed failure notification producer
leonardo-albertovich Sep 22, 2024
df3447e
output: added worker cleanup callback missing initialization call
leonardo-albertovich Sep 24, 2024
9290bb3
scheduler: improved timer coroutine return signal handling
leonardo-albertovich Sep 24, 2024
e0afbcb
scheduler: removed erroneously added field
leonardo-albertovich Sep 24, 2024
492fc60
engine: added missing scheduler coroutine cleanup
leonardo-albertovich Sep 24, 2024
6d39093
output_thread: added missing scheduler coroutine cleanup
leonardo-albertovich Sep 24, 2024
519393b
out_azure_blob: added worker context cleanup and stale file remediation
leonardo-albertovich Sep 24, 2024
ee94110
out_azure_blob: fixed leak and double free
leonardo-albertovich Sep 24, 2024
4d09bb8
out_azure_blob: added stale file remediation
leonardo-albertovich Sep 24, 2024
ddccb4d
out_azure_blob: fixed missing http verb support
leonardo-albertovich Sep 24, 2024
99a9b63
in_blob: fixed memory leaks
leonardo-albertovich Sep 24, 2024
2a0020c
in_blob: fixed memory leaks and added temporary win32 compatibility shim
leonardo-albertovich Sep 25, 2024
4d2d5fb
input_thread: fixed use after free (CID 509982)
leonardo-albertovich Sep 25, 2024
2bd01c1
in_blob: fixed leak (CID 509979)
leonardo-albertovich Sep 25, 2024
8eca355
out_azure_blob: fixed leak (CID 509977)
leonardo-albertovich Sep 25, 2024
d7b9b26
out_azure_blob: fixed leak (CID 509970)
leonardo-albertovich Sep 25, 2024
7a867d3
in_blob: fixed windows compatibility issues
leonardo-albertovich Sep 25, 2024
284013c
out_azure_blob: fixed warnings
leonardo-albertovich Sep 25, 2024
bce1a86
out_azure_blob: added missing function
leonardo-albertovich Sep 26, 2024
6f1da58
out_azure_blob: fixed typo and added missing constant
leonardo-albertovich Sep 26, 2024
b008160
out_azure_blob: added missing macro
leonardo-albertovich Sep 26, 2024
3841f0f
engine: added missing header
leonardo-albertovich Sep 27, 2024
0bf115c
in_blob: fixed typos
leonardo-albertovich Sep 27, 2024
c3355e2
input_blob: fixed warning
leonardo-albertovich Sep 27, 2024
5600b5b
tests: runtime: config_map_opts: initialize scheduler
edsiper Sep 27, 2024
478f91d
tests: runtime: filter_stdout: initialize scheduler
edsiper Sep 27, 2024
d801d96
tests: runtime: custom_calyptia: initialize scheduler
edsiper Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/plugins_options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ option(FLB_MINIMAL "Enable minimal build configuration" No)

# Inputs (sources, data collectors)
# =================================
DEFINE_OPTION(FLB_IN_BLOB "Enable Blob input plugin" ON)
DEFINE_OPTION(FLB_IN_CALYPTIA_FLEET "Enable Calyptia Fleet input plugin" ON)
DEFINE_OPTION(FLB_IN_COLLECTD "Enable Collectd input plugin" ON)
DEFINE_OPTION(FLB_IN_CPU "Enable CPU input plugin" ON)
Expand Down
10 changes: 7 additions & 3 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ struct flb_config {
int is_running; /* service running ? */
double flush; /* Flush timeout */

/*
* Maximum grace time on shutdown. If set to -1, the engine will
/*
* Maximum grace time on shutdown. If set to -1, the engine will
* shutdown when all remaining tasks are flushed
*/
int grace;
int grace;
int grace_count; /* Count of grace shutdown tries */
flb_pipefd_t flush_fd; /* Timer FD associated to flush */
int convert_nan_to_null; /* convert null to nan ? */
Expand All @@ -71,6 +71,10 @@ struct flb_config {

flb_pipefd_t ch_self_events[2]; /* channel to recieve thread tasks */

int notification_channels_initialized;
flb_pipefd_t notification_channels[2];
struct mk_event notification_event;

/* Channel event loop (just for ch_notif) */
struct mk_event_loop *ch_evl;

Expand Down
20 changes: 12 additions & 8 deletions include/fluent-bit/flb_engine_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
/* Types of events handled by the Server engine */
#define FLB_ENGINE_EV_CORE MK_EVENT_NOTIFICATION
#define FLB_ENGINE_EV_CUSTOM MK_EVENT_CUSTOM
#define FLB_ENGINE_EV_THREAD 1024
#define FLB_ENGINE_EV_SCHED 2048
#define FLB_ENGINE_EV_SCHED_FRAME (FLB_ENGINE_EV_SCHED + 4096)

#define FLB_ENGINE_EV_INPUT 8192
#define FLB_ENGINE_EV_THREAD_INPUT 16384 /* reserved, not used yet */
#define FLB_ENGINE_EV_THREAD (1 << 10) /* 1024 */
#define FLB_ENGINE_EV_SCHED (1 << 11) /* 2048 */
#define FLB_ENGINE_EV_SCHED_FRAME (FLB_ENGINE_EV_SCHED | (1 << 12)) /* 2048 | 4096 = 6144 */
#define FLB_ENGINE_EV_SCHED_CORO (1 << 13) /* 8192 */

#define FLB_ENGINE_EV_OUTPUT 32768
#define FLB_ENGINE_EV_THREAD_OUTPUT 65536
#define FLB_ENGINE_EV_THREAD_ENGINE 131072
#define FLB_ENGINE_EV_INPUT (1 << 14) /* 16384 */
#define FLB_ENGINE_EV_THREAD_INPUT (1 << 15) /* 32768 */

#define FLB_ENGINE_EV_OUTPUT (1 << 16) /* 65536 */
#define FLB_ENGINE_EV_THREAD_OUTPUT (1 << 17) /* 131072 */
#define FLB_ENGINE_EV_THREAD_ENGINE (1 << 18) /* 262144 */

#define FLB_ENGINE_EV_NOTIFICATION (1 << 19) /* 524288 */

/* Engine events: all engine events set the left 32 bits to '1' */
#define FLB_ENGINE_EV_STARTED FLB_BITS_U64_SET(1, 1) /* Engine started */
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#define FLB_EVENT_TYPE_LOGS FLB_INPUT_CHUNK_TYPE_LOGS
#define FLB_EVENT_TYPE_METRICS FLB_INPUT_CHUNK_TYPE_METRICS
#define FLB_EVENT_TYPE_TRACES FLB_INPUT_CHUNK_TYPE_TRACES
#define FLB_EVENT_TYPE_BLOBS FLB_INPUT_CHUNK_TYPE_BLOBS

#define FLB_EVENT_TYPE_HAS_TRACE FLB_INPUT_CHUNK_HAS_TRACE

Expand Down
4 changes: 4 additions & 0 deletions include/fluent-bit/flb_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ struct flb_filter_plugin {
void *, struct flb_config *);
int (*cb_exit) (void *, struct flb_config *);

/* Notification: this callback will be invoked anytime a notification is received*/
int (*cb_notification) (struct flb_filter_instance *, struct flb_config *, void *);

struct mk_list _head; /* Link to parent list (config->filters) */
};

Expand Down Expand Up @@ -110,6 +113,7 @@ struct flb_filter_instance {
#ifdef FLB_HAVE_METRICS
struct flb_metrics *metrics; /* metrics */
#endif
flb_pipefd_t notification_channel;

/* Keep a reference to the original context this instance belongs to */
struct flb_config *config;
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#define FLB_HTTP_HEAD 3
#define FLB_HTTP_CONNECT 4
#define FLB_HTTP_PATCH 5
#define FLB_HTTP_DELETE 6

/* HTTP Flags */
#define FLB_HTTP_10 1
Expand Down
10 changes: 8 additions & 2 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ struct flb_input_plugin {
/* Collect: every certain amount of time, Fluent Bit trigger this callback */
int (*cb_collect) (struct flb_input_instance *, struct flb_config *, void *);

/* Notification: this callback will be invoked anytime a notification is received*/
int (*cb_notification) (struct flb_input_instance *, struct flb_config *, void *);

/*
* Flush: each plugin during a collection, it does some buffering,
* when the Flush timer takes place on the Engine, it will trigger
Expand Down Expand Up @@ -381,6 +384,8 @@ struct flb_input_instance {
struct mk_list *net_config_map;
struct mk_list net_properties;

flb_pipefd_t notification_channel;

/* Keep a reference to the original context this instance belongs to */
struct flb_config *config;
};
Expand Down Expand Up @@ -577,7 +582,7 @@ static FLB_INLINE int flb_input_is_threaded(struct flb_input_instance *ins)
* number of retries, if it has exceeded the 'retry_limit' option, an FLB_ERROR
* will be returned instead.
*/
static inline void flb_input_return(struct flb_coro *coro) {
static FLB_INLINE void flb_input_return(struct flb_coro *coro) {
int n;
uint64_t val;
struct flb_input_coro *input_coro;
Expand All @@ -599,7 +604,7 @@ static inline void flb_input_return(struct flb_coro *coro) {
flb_input_coro_prepare_destroy(input_coro);
}

static inline void flb_input_return_do(int ret) {
static FLB_INLINE void flb_input_return_do(int ret) {
struct flb_coro *coro = flb_coro_get();

flb_input_return(coro);
Expand Down Expand Up @@ -733,6 +738,7 @@ int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *in
int flb_input_downstream_set(struct flb_downstream *stream,
struct flb_input_instance *ins);


/* processors */
int flb_input_instance_processors_load(struct flb_input_instance *ins, struct flb_cf_group *processors);

Expand Down
47 changes: 47 additions & 0 deletions include/fluent-bit/flb_input_blob.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_INPUT_BLOB_H
#define FLB_INPUT_BLOB_H

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_notification.h>
#include <fluent-bit/flb_log_event_encoder.h>

struct flb_blob_delivery_notification {
struct flb_notification base;
cfl_sds_t path;
int success;
};


struct flb_blob_file {
cfl_sds_t path;
};

void flb_input_blob_delivery_notification_destroy(void *instance);

int flb_input_blob_file_get_info(msgpack_object map, cfl_sds_t *source,
cfl_sds_t *file_path, size_t *size);
int flb_input_blob_file_register(struct flb_input_instance *ins,
struct flb_log_event_encoder *encoder,
const char *tag, size_t tag_len,
char *file_path, size_t size);
#endif
1 change: 1 addition & 0 deletions include/fluent-bit/flb_input_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#define FLB_INPUT_CHUNK_TYPE_LOGS 0
#define FLB_INPUT_CHUNK_TYPE_METRICS 1
#define FLB_INPUT_CHUNK_TYPE_TRACES 2
#define FLB_INPUT_CHUNK_TYPE_BLOBS 3

#ifdef FLB_HAVE_CHUNK_TRACE
#define FLB_INPUT_CHUNK_HAS_TRACE 1 << 31
Expand Down
4 changes: 2 additions & 2 deletions include/fluent-bit/flb_input_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
#ifndef FLB_INPUT_EVENT_H
#define FLB_INPUT_EVENT_H

/* support event types by input plugins*/

/* support event types by input plugins */
#define FLB_INPUT_LOGS 0
#define FLB_INPUT_METRICS 1
#define FLB_INPUT_TRACES 2
#define FLB_INPUT_BLOBS 3

#endif
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_input_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ struct flb_input_thread_instance {
struct mk_event_loop *evl; /* thread event loop context */
flb_pipefd_t ch_parent_events[2]; /* communication between parent and thread */
flb_pipefd_t ch_thread_events[2]; /* local messages in the thread event loop */
int notification_channels_initialized;
flb_pipefd_t notification_channels[2];
struct mk_event notification_event;
struct flb_input_instance *ins; /* output plugin instance */
struct flb_tp *tp;
struct flb_tp_thread *th;
Expand Down
54 changes: 54 additions & 0 deletions include/fluent-bit/flb_notification.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_NOTIFICATION_H
#define FLB_NOTIFICATION_H

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_socket.h>
#include <fluent-bit/flb_config.h>
#include <cfl/cfl_sds.h>


#define FLB_NOTIFICATION_TYPE_BLOB_DELIVERY 1


struct flb_notification {
int plugin_type;
void *plugin_instance;
int notification_type;
int dynamically_allocated;

void (*destructor)(void *);
};

int flb_notification_enqueue(int plugin_type,
char *instance_name,
struct flb_notification *notification,
struct flb_config *config);

int flb_notification_receive(flb_pipefd_t channel,
struct flb_notification **notification);

int flb_notification_deliver(struct flb_notification *notification);

void flb_notification_cleanup(struct flb_notification *notification);

#endif
9 changes: 9 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ int flb_chunk_trace_output(struct flb_chunk_trace *trace, struct flb_output_inst
#define FLB_OUTPUT_LOGS 1
#define FLB_OUTPUT_METRICS 2
#define FLB_OUTPUT_TRACES 4
#define FLB_OUTPUT_BLOBS 8

#define FLB_OUTPUT_FLUSH_COMPAT_OLD_18() \
const void *data = event_chunk->data; \
Expand Down Expand Up @@ -232,6 +233,12 @@ struct flb_output_plugin {
/* Default number of worker threads */
int workers;

int (*cb_worker_init) (void *, struct flb_config *);
int (*cb_worker_exit) (void *, struct flb_config *);

/* Notification: this callback will be invoked anytime a notification is received*/
int (*cb_notification) (struct flb_output_instance *, struct flb_config *, void *);

/* Tests */
struct flb_test_out_formatter test_formatter;

Expand Down Expand Up @@ -445,6 +452,8 @@ struct flb_output_instance {
struct mk_list flush_list;
struct mk_list flush_list_destroy;

flb_pipefd_t notification_channel;

/* Keep a reference to the original context this instance belongs to */
struct flb_config *config;
};
Expand Down
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_output_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ struct flb_out_thread_instance {
struct flb_bucket_queue *evl_bktq; /* bucket queue for evl track event priority */
flb_pipefd_t ch_parent_events[2]; /* channel to receive parent notifications */
flb_pipefd_t ch_thread_events[2]; /* channel to send messages local event loop */
int notification_channels_initialized;
flb_pipefd_t notification_channels[2];
struct mk_event notification_event;
struct flb_output_instance *ins; /* output plugin instance */
struct flb_config *config;
struct flb_tp_thread *th;
Expand Down
7 changes: 7 additions & 0 deletions include/fluent-bit/flb_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ struct flb_processor {
void *data;
int source_plugin_type;

flb_pipefd_t notification_channel;

/* Fluent Bit context */
struct flb_config *config;
};
Expand Down Expand Up @@ -155,6 +157,9 @@ struct flb_processor_plugin {

int (*cb_exit) (struct flb_processor_instance *, void *);

/* Notification: this callback will be invoked anytime a notification is received*/
int (*cb_notification) (struct flb_processor_instance *, struct flb_config *, void *);

struct mk_list _head; /* Link to parent list (config->filters) */
};

Expand All @@ -179,6 +184,8 @@ struct flb_processor_instance {
*/
struct cmt *cmt; /* parent context */

flb_pipefd_t notification_channel;

/* Keep a reference to the original context this instance belongs to */
struct flb_config *config;
};
Expand Down
4 changes: 4 additions & 0 deletions include/fluent-bit/flb_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ static inline int flb_router_match_type(int in_event_type,
!(o_ins->event_type & FLB_OUTPUT_TRACES)) {
return FLB_FALSE;
}
else if (in_event_type == FLB_INPUT_BLOBS &&
!(o_ins->event_type & FLB_OUTPUT_BLOBS)) {
return FLB_FALSE;
}

return FLB_TRUE;
}
Expand Down
Loading
Loading