Skip to content

Commit

Permalink
filter_lua: Add chunk mode for processing multiple records
Browse files Browse the repository at this point in the history
Documentation for fluent/fluent-bit#8478

Signed-off-by: Richard Treu <[email protected]>
  • Loading branch information
drbugfinder-work authored Feb 12, 2024
1 parent a039f6f commit d52d17e
Showing 1 changed file with 180 additions and 0 deletions.
180 changes: 180 additions & 0 deletions pipeline/filters/lua.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The plugin supports the following configuration parameters:
| time\_as\_table | By default when the Lua script is invoked, the record timestamp is passed as a *floating number* which might lead to precision loss when it is converted back. If you desire timestamp precision, enabling this option will pass the timestamp as a Lua table with keys `sec` for seconds since epoch and `nsec` for nanoseconds. |
| code | Inline LUA code instead of loading from a path via `script`. |
| enable_flb_null| If enabled, null will be converted to flb_null in Lua. It is useful to prevent removing key/value since nil is a special value to remove key value from map in Lua. Default is false. |
| chunk_mode | If enabled, a whole chunk will be sent to Lua script as a table of timestamps and records. It may be used for e.g. parallel execution inside Lua. Default is false. |

## Getting Started <a id="getting_started"></a>

Expand Down Expand Up @@ -348,3 +349,182 @@ Configuration to get istio logs and apply response code filter to them.
#### Output

In the output only the messages with response code 0 or greater than 399 are shown.

### Chunk mode

There is a `chunk_mode` for the Lua filter in Fluent Bit. This mode can be useful for cases like parallelization, particularly when utilizing Lua lanes.

#### Function Signature

The Lua functions associated with this mode accept only two arguments:

```
function process_records(tag, records)
```

#### Configuration

The configuration for the Lua filter using chunk mode looks like this:

```
[FILTER]
Name lua
Match my_logs
script lanes_example.lua
call process_records
chunk_mode On
time_as_table On
```
#### Note

- This mode currently only supports `time_as_table` by default.
- Records are always emitted; there is no return code to be set.

#### Return Table Format

The return table must maintain this format, i.e., a table of timestamp and record pairs.

| Timestamp | Record |
|--------------------------|---------------------------------------|
| { | { |
| sec: <timestamp_sec>, | message: "your_dummy_log_message" |
| nsec: <timestamp_nsec>| } |
| } | |
| | |
| { | { |
| sec: <timestamp_sec>, | message: "your_dummy_log_message" |
| nsec: <timestamp_nsec>| } |
| } | |

Please refer to the following examples to see how to build the return table.

#### Input Table Example

```
function process_records(tag, records)
if records and type(records) == "table" then
for i, record_row in ipairs(records) do
local timestamp = record_row.timestamp
local record = record_row.record
print("Timestamp entry:", timestamp.sec, timestamp.nsec)
print("Record entry:", record.message)
end
else
print("Error: Invalid 'records' table or nil")
end
return records
end
```


#### Parallelization Example

Ensure that you have Lua lanes installed (e.g. `luarocks install lanes`) and to set the path appropriately (`luarocks show lanes`) in your lua script.
To inject multiple dummy messages at once, you can adjust the `Copies` parameter of the `dummy` input.
Keep in mind that this example will create a new thread for every record in the chunk to keept the example simple.

```
fluent-bit.conf:
[SERVICE]
Flush 5
Log_Level debug
Daemon off
HTTP_Server Off
[INPUT]
Name dummy
Tag my_logs
Rate 1
Copies 2
Dummy {"message":"your_dummy_log_message"}
[FILTER]
Name lua
Match my_logs
script lanes_example.lua
call threads
chunk_mode On
time_as_table On
[OUTPUT]
Name stdout
Match my_logs
```

```lua
lanes_example.lua:

-- Specify path to Lua Lanes module
-- Install via: 'luarocks install lanes'
local lanes_path = "/usr/local/share/lua/5.1/lanes.lua"

-- Load Lanes lib
local lanes = assert(loadfile(lanes_path))().configure()

-- Lua function that will be executed as separate threads
local function process_log(timestamp, record)
-- Add your CPU intensive code here
print("Timestamp:", timestamp.sec, timestamp.nsec)
print("Record:", record.message)

record.message = "Modified"
return timestamp, record
end

-- Entry function
function threads(tag, records)
print("LUA ")
local thread_handles = {}
local results = {}
if records and type(records) == "table" then
print("Number of incoming records:", #records)
for i, log_event in ipairs(records) do
local timestamp = log_event.timestamp
local record = log_event.record

-- Use lanes.gen to create a new thread
local thread = lanes.gen("*", process_log)(timestamp, record)

-- Store the thread handle
table.insert(thread_handles, thread)
end
-- Wait for all threads to finish
for _, thread in ipairs(thread_handles) do
-- Get the result returned by each thread
local modified_record = thread[2]
local modified_timestamp = thread[1]
local result = {timestamp = modified_timestamp, record = modified_record}
table.insert(results, result)
end
print("All threads returned")
else
print("Error: Invalid or nil 'records' table.")
end

return results
end
```

You should get the similar output:
```
...
LUA
Number of incoming records: 2
Timestamp: 1707308482 97855348
Record: your_dummy_log_message
Timestamp: 1707308482 97664060
Record: your_dummy_log_message
All threads returned
...
[0] my_logs: [[1707308479.115073794, {}], {"message"=>"Modified"}]
[1] my_logs: [[1707308479.147705065, {}], {"message"=>"Modified"}]
[2] my_logs: [[1707308480.097053227, {}], {"message"=>"Modified"}]
[3] my_logs: [[1707308480.097306893, {}], {"message"=>"Modified"}]
[4] my_logs: [[1707308481.097325851, {}], {"message"=>"Modified"}]
[5] my_logs: [[1707308481.097515912, {}], {"message"=>"Modified"}]
[6] my_logs: [[1707308482.097664060, {}], {"message"=>"Modified"}]
[7] my_logs: [[1707308482.097855348, {}], {"message"=>"Modified"}]
...
```

0 comments on commit d52d17e

Please sign in to comment.