Skip to content

Commit 2b4ec4b

Browse files
committed
in_amqp: Add documentation for in_amqp plugin
1 parent 4a18664 commit 2b4ec4b

1 file changed

Lines changed: 144 additions & 0 deletions

File tree

pipeline/inputs/amqp.md

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# AMQP
2+
3+
The _AMQP_ input plugin allows Fluent Bit to consume messages from an AMQP broker such as RabbitMQ. It connects to the specified broker, consumes messages from a queue, and processes them as log records.
4+
5+
## Configuration parameters
6+
7+
The plugin supports the following configuration parameters:
8+
9+
| Key | Description | Default |
10+
|:---|:---|:---|
11+
| `uri` | Specify an AMQP URI to connect to the broker | `amqp://` |
12+
| `queue` | Specify an AMQP queue name to consume from | _none_ (required) |
13+
| `parser` | Specify a parser to process the message payload | _none_ |
14+
| `reconnect.retry_limits` | Maximum number of retries to connect to the broker | `5` |
15+
| `reconnect.retry_interval` | Retry interval (in seconds) to connect to the broker | `60` |
16+
17+
## How it works
18+
19+
The AMQP input plugin connects to an AMQP broker and consumes messages from a specified queue. Each message is processed and converted into a Fluent Bit log record with the following characteristics:
20+
21+
1. The message body becomes the main content of the record
22+
2. AMQP message properties (like content type, routing key, etc.) are added as metadata
23+
3. AMQP message headers are added as a nested metadata field
24+
4. If a parser is specified, it's applied to the message body
25+
26+
### Message Properties Mapping
27+
28+
The following AMQP message properties are mapped to Fluent Bit record metadata:
29+
30+
- `routing_key` - The routing key used to route the message to the queue
31+
- `content_type` - The MIME content type of the message
32+
- `content_encoding` - The content encoding of the message
33+
- `correlation_id` - Application correlation identifier
34+
- `reply_to` - Address to reply to
35+
36+
### Message Headers
37+
38+
AMQP message headers are mapped to a `headers` field in the record metadata as a key-value map.
39+
40+
## Get started
41+
42+
To consume messages from an AMQP broker, you can run the plugin from the command line or through the configuration file.
43+
44+
### Command line
45+
46+
The following command will start Fluent Bit with the AMQP input plugin:
47+
48+
```shell
49+
fluent-bit -i amqp -p queue=my_queue -o stdout
50+
```
51+
52+
### Configuration file
53+
54+
In your main configuration file, append the following sections:
55+
56+
{% tabs %}
57+
{% tab title="fluent-bit.yaml" %}
58+
59+
```yaml
60+
pipeline:
61+
inputs:
62+
- name: amqp
63+
queue: my_queue
64+
uri: amqp://guest:guest@localhost:5672/%2F
65+
66+
outputs:
67+
- name: stdout
68+
match: '*'
69+
```
70+
71+
{% endtab %}
72+
{% tab title="fluent-bit.conf" %}
73+
74+
```text
75+
[INPUT]
76+
Name amqp
77+
Queue my_queue
78+
URI amqp://guest:guest@localhost:5672/%2F
79+
80+
[OUTPUT]
81+
Name stdout
82+
Match *
83+
```
84+
85+
{% endtab %}
86+
{% endtabs %}
87+
88+
## Example: Consuming JSON messages
89+
90+
If your AMQP messages contain JSON data, you can use a parser to process them:
91+
92+
{% tabs %}
93+
{% tab title="fluent-bit.yaml" %}
94+
95+
```yaml
96+
pipeline:
97+
inputs:
98+
- name: amqp
99+
queue: json_messages
100+
parser: json
101+
uri: amqp://guest:guest@localhost:5672/%2F
102+
103+
parsers:
104+
- name: json
105+
format: json
106+
107+
outputs:
108+
- name: stdout
109+
match: '*'
110+
```
111+
112+
{% endtab %}
113+
{% tab title="fluent-bit.conf" %}
114+
115+
```text
116+
[INPUT]
117+
Name amqp
118+
Queue json_messages
119+
Parser json
120+
URI amqp://guest:guest@localhost:5672/%2F
121+
122+
[PARSER]
123+
Name json
124+
Format json
125+
126+
[OUTPUT]
127+
Name stdout
128+
Match *
129+
```
130+
131+
{% endtab %}
132+
{% endtabs %}
133+
134+
## Connection Management
135+
136+
The plugin handles connection failures gracefully:
137+
138+
1. If the initial connection fails, it will retry based on `reconnect.retry_limits` and `reconnect.retry_interval`
139+
2. If a connection is lost during operation, the plugin will automatically attempt to reconnect
140+
3. Messages consumed but not yet processed will be requeued by the broker (assuming proper acknowledgment settings)
141+
142+
## Requirements
143+
144+
The AMQP input plugin requires the RabbitMQ C client library (rabbitmq-c) to be installed on the system.

0 commit comments

Comments
 (0)