Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ Changelog
in development
--------------

Fixed
~~~~~
* Remove the usage of the Stackstorm/logshipper (stops working with Python 3.12) and the usage of eventlet in the `linux.file_watch_sensor`. (by @skiedude)

Changed
~~~~~~~

Added
~~~~~

3.9.0 - October 10, 2025
------------------------
Expand Down
39 changes: 24 additions & 15 deletions contrib/linux/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,37 @@

This pack contains actions for commonly used Linux commands and tools.

## Configuration
## Sensors

### FileWatchSensor

* ``file_watch_sensor.file_paths`` - A list of paths to the files to monitor.
Note: Those need to be full paths to the files (e.g. ``/var/log/auth.log``)
and not directories (files don't need to exist yet when the sensor is ran
though).
This sensor monitors files that are declared by a rule (one file per rule). Once a new line is
detected, a trigger is emitted.

Example:
### Example Rule:

```yaml
---
file_watch_sensor:
file_paths:
- /opt/data/absolute_path_to_file.log
This example rule will register with the FileWatchSensor to watch the file `/tmp/st2_test`. When a new line is
detected, the trigger will be emitted, and the action `core.local` will echo the trigger data.
```
---
name: sample_rule_file_watch
pack: "examples"
description: Sample rule custom trigger type - add a file to be watched by file_watch_sensor in linux pack.
enabled: true

## Sensors
trigger:
parameters:
file_path: /tmp/st2_test
type: linux.file_watch.line

### FileWatchSensor
criteria: {}

This sensor monitors specified files for new new lines. Once a new line is
detected, a trigger is emitted.
action:
parameters:
cmd: echo "{{trigger}}"
ref: core.local

```

### linux.file_watch.line trigger

Expand Down
29 changes: 0 additions & 29 deletions contrib/linux/sensors/README.md

This file was deleted.

63 changes: 36 additions & 27 deletions contrib/linux/sensors/file_watch_sensor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 The StackStorm Authors.
# Copyright 2020-2026 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -14,9 +14,8 @@
# limitations under the License.

import os
import eventlet

from logshipper.tail import Tail
import time
import threading

from st2reactor.sensor.base import Sensor

Expand All @@ -27,25 +26,19 @@ def __init__(self, sensor_service, config=None):
sensor_service=sensor_service, config=config
)
self.log = self._sensor_service.get_logger(__name__)
self.tail = None
self._watchers = {} # file_path -> (thread, stop_event)
self.file_ref = {}

def setup(self):
self.tail = Tail(filenames=[])
self.tail.handler = self._handle_line
self.tail.should_run = True
pass

def run(self):
self.tail.run()
while True:
time.sleep(1)

def cleanup(self):
if self.tail:
self.tail.should_run = False

try:
self.tail.notifier.stop()
except Exception:
self.log.exception("Unable to stop the tail notifier")
for file_path in list(self._watchers):
self._stop_watcher(file_path)

def add_trigger(self, trigger):
file_path = trigger["parameters"].get("file_path", None)
Expand All @@ -54,18 +47,19 @@ def add_trigger(self, trigger):
self.log.error('Received trigger type without "file_path" field.')
return

trigger = trigger.get("ref", None)
ref = trigger.get("ref", None)

if not trigger:
if not ref:
raise Exception(f"Trigger {trigger} did not contain a ref.")

# Wait a bit to avoid initialization race in logshipper library
eventlet.sleep(1.0)
self.file_ref[file_path] = ref

self.tail.add_file(filename=file_path)
self.file_ref[file_path] = trigger
stop_event = threading.Event()
t = threading.Thread(target=self._tail, args=(file_path, stop_event), daemon=True)
self._watchers[file_path] = (t, stop_event)
t.start()

self.log.info(f"Added file '{file_path}' ({trigger}) to watch list.")
self.log.info(f"Added file '{file_path}' ({ref}) to watch list.")

def update_trigger(self, trigger):
pass
Expand All @@ -77,10 +71,25 @@ def remove_trigger(self, trigger):
self.log.error("Received trigger type without 'file_path' field.")
return

self.tail.remove_file(filename=file_path)
self.file_ref.pop(file_path)

self.log.info(f"Removed file '{file_path}' ({trigger}) from watch list.")
self._stop_watcher(file_path)
self.file_ref.pop(file_path, None)

self.log.info(f"Removed file '{file_path}' from watch list.")

def _stop_watcher(self, file_path):
if file_path in self._watchers:
_, stop_event = self._watchers.pop(file_path)
stop_event.set()

def _tail(self, file_path, stop_event):
with open(file_path, "r") as f:
f.seek(0, 2) # seek to EOF
while not stop_event.is_set():
line = f.readline()
if line:
self._handle_line(file_path, line.strip())
else:
time.sleep(0.1)

def _handle_line(self, file_path, line):
if file_path not in self.file_ref:
Expand Down
Loading