Skip to content

[AWSINTS-3450] feat(go-forwarder): add CloudwatchLogEntry creation logic#1094

Merged
ndakkoune merged 14 commits intonabil.dakkoune/go-forwarderfrom
nabil.dakkoune/AWSINTS-3450
Apr 15, 2026
Merged

[AWSINTS-3450] feat(go-forwarder): add CloudwatchLogEntry creation logic#1094
ndakkoune merged 14 commits intonabil.dakkoune/go-forwarderfrom
nabil.dakkoune/AWSINTS-3450

Conversation

@ndakkoune
Copy link
Copy Markdown
Contributor

@ndakkoune ndakkoune commented Apr 1, 2026

What does this PR do?

Add (Cloudwatch)LogEntry creation logic (source detection, overriding tags, etc.)

Motivation

Testing Guidelines

Any for now, quick feedback before writing the tests.

Additional Notes

Except from the CONTROL_MESSAGE skipping, the behavior is roughly the same as the Python one.

Types of changes

  • Bug fix
  • New feature
  • Breaking change
  • Misc (docs, refactoring, dependency upgrade, etc.)

Check all that apply

  • This PR's description is comprehensive
  • This PR contains breaking changes that are documented in the description
  • This PR introduces new APIs or parameters that are documented and unlikely to change in the foreseeable future
  • This PR impacts documentation, and it has been updated (or a ticket has been logged)
  • This PR's changes are covered by the automated tests
  • This PR collects user input/sensitive content into Datadog
  • This PR passes the integration tests (ask a Datadog member to run the tests)
  • This PR passes the unit tests
  • This PR passes the installation tests (ask a Datadog member to run the tests)

@ndakkoune ndakkoune requested a review from a team as a code owner April 1, 2026 16:06
@github-actions github-actions bot added the aws label Apr 1, 2026
Comment on lines +33 to +35
if data.MessageType == "CONTROL_MESSAGE" {
return
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not present in the Python implementation. the control message are probing messages from cloudwatch to verify that the lambda is reachable => noise.

@ViBiOh ViBiOh self-assigned this Apr 2, 2026
"github.com/aws/aws-lambda-go/lambdacontext"
)

func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💬 suggestion: ‏The naming of the function is a bit confusing. Usually a parser is a "pure" function, you give an input, you get an output and an error.

Here, the error are logged directly and we output to the chan. I think we should make this function "pure" (easier to test, because currently untested) and let the Parse function that call it deal with the error and forwarding to the chan

Edit: I see below that we have the log entry in memory and we stream them, so maybe just an intermediate function handleCloudwatchLogs that read the parse, and then stream

Copy link
Copy Markdown
Contributor Author

@ndakkoune ndakkoune Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. I kept the channel approach with only one goroutine for cloudwatch. The S3 Handler will probably trigger more goroutines and leverage streaming.

Comment on lines +57 to +61
select {
case out <- entry:
case <-ctx.Done():
return
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥜 nitpick: ‏While it "would" work as expected, it's not deterministic. Have a look at the internal repository, there is a SafeSender method you can copy/paste in an util package here

Comment on lines +77 to +79
if strings.HasPrefix(logStream, "states/") {
source = "stepfunction"
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💬 suggestion: ‏ You can move this after the source override and change to a definitive return (doing a fast-exit)

if lc, ok := lambdacontext.FromContext(ctx); ok {
metadata.InvokedFunctionARN = lc.InvokedFunctionArn
} else {
slog.Warn("failed lambda context loading")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ question: ‏What's the actionable thing to do there? We should either enrich with some field or remove the log

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is a safety net. The only ways the else case is hit is (1) when the code is not ran from AWS Lambda (e.g. testing) or (2) someone passes a plain context.Background(). It should NEVER be hit in production, and if it does, the default string value "" will be sent in the backend (we would notice it with a status:skipped).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should change the logged message to this explanation.

My point is: if a customer see that in 6 months, the log should explain the situation clearly.

@ndakkoune ndakkoune requested a review from ViBiOh April 2, 2026 15:54
@ndakkoune ndakkoune force-pushed the nabil.dakkoune/AWSINTS-3450 branch from 173fefa to 26ba416 Compare April 8, 2026 09:04
"strings"
)

type Tags []string
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This little workaround is necessary due to constraints in the backend side.

json.MarshalJSON converts a slice to a JSON array (expected behavior), but the backend needs a string of comma-separated key:value pairs, hence this custom MarshalJSON.

@ndakkoune ndakkoune force-pushed the nabil.dakkoune/AWSINTS-3450 branch from f7ac191 to 493ec22 Compare April 10, 2026 16:01
@ndakkoune ndakkoune force-pushed the nabil.dakkoune/AWSINTS-3450 branch from dd5ed1c to 4dbd0e9 Compare April 13, 2026 08:29
case parsing.InvocationSourceCloudwatchLogs:
return pipeline.Run(ctx, event, cfg, parsing.HandleCloudwatchLogs)
default:
slog.Error("unsupported invocation source")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ issue: ‏ You should store the dectected invocation source as a variable and put in the log. In current scheme, the log is not actionable / hard du debug.

return err
}
if !ok {
return nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ issue: ‏There is a concurrency issue in this function.

We have 3 workers. In any worker encounter an error, it exits and stop, stopping to dequeue from the input channel.

If the 3 workers fail, the in chan will be full, nothing will dequeue it, introducing a deadlock for the upstream goroutine.

The only call of Forward is here but the ctx is not managed by the errgroup.Group. So if the goroutine fails, there is no signal sent to other goroutine to stop.

We should fix the context cancellation in the pipeline file and maybe leave a TODO here to handle the retry mechanism (later in the migration).

if lc, ok := lambdacontext.FromContext(ctx); ok {
metadata.InvokedFunctionARN = lc.InvokedFunctionArn
} else {
slog.Warn("failed lambda context loading")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should change the logged message to this explanation.

My point is: if a customer see that in 6 months, the log should explain the situation clearly.

@ndakkoune ndakkoune requested a review from ViBiOh April 14, 2026 09:08
func HandleCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) error {
logEntries, err := parseCloudwatchLogs(ctx, event, cfg)
if err != nil {
slog.Error("failed parse cloudwatch logs", slog.Any("error", err))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ question: ‏We don't return after the parsing error. Should we return the error and let the caller log it, or return nil.

The logEntries below will very likely to be nil/empty, so we will return nil in the end. I think we should return the error. If we can't parse the cloudwatch message, it's very likely to be a broad failure and not pretend everything was fine if you don't look at the logs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going for the return err.

Comment thread aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go Outdated
Comment thread aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go Outdated
Comment on lines +86 to +92
var source string
if strings.Contains(logStream, "_CloudTrail_") {
source = "cloudtrail"
} else {
source = getSourceFromLogGroup(strings.ToLower(logGroup))
}
return source
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here regarding "fast-exit", if you have your answer, exit the function directly. This avoid nesting instruction and give a clear execution path of the function.

)

type invocationSource int
//go:generate stringer -type InvocationSource -trimprefix InvocationSource
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏 praise: ‏ 👍

var service string

if cfg.CustomTags != "" {
for _, tag := range strings.Split(cfg.CustomTags, ",") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your code editor will suggest to use SplitSeq

cfg *config.Config,
handler func(context.Context, json.RawMessage, *config.Config, chan<- T) error,
) error {
g, ctx := errgroup.WithContext(ctx) // Fragile because if one goroutine fails, all stages will stop gracefully
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 note: ‏ I don't think it's "fragile" in this case. It's the graceful shutdown management and ensure that there won't be any deadlock of one goroutine quitting listening to a chan , leaving the others stuck

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this comment as a self memory note for the future, where we probably won stop all the goroutines if, for example, a forwarding worker fails.

@ndakkoune ndakkoune requested a review from ViBiOh April 14, 2026 14:36
if service != "" {
continue
if service == "" {
service = tag[8:]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More explicit with constants

@ndakkoune ndakkoune merged commit 0ffcb57 into nabil.dakkoune/go-forwarder Apr 15, 2026
10 checks passed
@ndakkoune ndakkoune deleted the nabil.dakkoune/AWSINTS-3450 branch April 15, 2026 12:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants