[AWSINTS-3450] feat(go-forwarder): add CloudwatchLogEntry creation logic#1094
Conversation
| if data.MessageType == "CONTROL_MESSAGE" { | ||
| return | ||
| } |
There was a problem hiding this comment.
Not present in the Python implementation. the control message are probing messages from cloudwatch to verify that the lambda is reachable => noise.
| "github.com/aws/aws-lambda-go/lambdacontext" | ||
| ) | ||
|
|
||
| func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) { |
There was a problem hiding this comment.
💬 suggestion: The naming of the function is a bit confusing. Usually a parser is a "pure" function, you give an input, you get an output and an error.
Here, the error are logged directly and we output to the chan. I think we should make this function "pure" (easier to test, because currently untested) and let the Parse function that call it deal with the error and forwarding to the chan
Edit: I see below that we have the log entry in memory and we stream them, so maybe just an intermediate function handleCloudwatchLogs that read the parse, and then stream
There was a problem hiding this comment.
Great. I kept the channel approach with only one goroutine for cloudwatch. The S3 Handler will probably trigger more goroutines and leverage streaming.
| select { | ||
| case out <- entry: | ||
| case <-ctx.Done(): | ||
| return | ||
| } |
There was a problem hiding this comment.
🥜 nitpick: While it "would" work as expected, it's not deterministic. Have a look at the internal repository, there is a SafeSender method you can copy/paste in an util package here
| if strings.HasPrefix(logStream, "states/") { | ||
| source = "stepfunction" | ||
| } |
There was a problem hiding this comment.
💬 suggestion: You can move this after the source override and change to a definitive return (doing a fast-exit)
| if lc, ok := lambdacontext.FromContext(ctx); ok { | ||
| metadata.InvokedFunctionARN = lc.InvokedFunctionArn | ||
| } else { | ||
| slog.Warn("failed lambda context loading") |
There was a problem hiding this comment.
❓ question: What's the actionable thing to do there? We should either enrich with some field or remove the log
There was a problem hiding this comment.
This one is a safety net. The only ways the else case is hit is (1) when the code is not ran from AWS Lambda (e.g. testing) or (2) someone passes a plain context.Background(). It should NEVER be hit in production, and if it does, the default string value "" will be sent in the backend (we would notice it with a status:skipped).
There was a problem hiding this comment.
You should change the logged message to this explanation.
My point is: if a customer see that in 6 months, the log should explain the situation clearly.
173fefa to
26ba416
Compare
| "strings" | ||
| ) | ||
|
|
||
| type Tags []string |
There was a problem hiding this comment.
This little workaround is necessary due to constraints in the backend side.
json.MarshalJSON converts a slice to a JSON array (expected behavior), but the backend needs a string of comma-separated key:value pairs, hence this custom MarshalJSON.
f7ac191 to
493ec22
Compare
…dler return the error and update main with source detection + pipeline trigger
dd5ed1c to
4dbd0e9
Compare
| case parsing.InvocationSourceCloudwatchLogs: | ||
| return pipeline.Run(ctx, event, cfg, parsing.HandleCloudwatchLogs) | ||
| default: | ||
| slog.Error("unsupported invocation source") |
There was a problem hiding this comment.
| return err | ||
| } | ||
| if !ok { | ||
| return nil |
There was a problem hiding this comment.
We have 3 workers. In any worker encounter an error, it exits and stop, stopping to dequeue from the input channel.
If the 3 workers fail, the in chan will be full, nothing will dequeue it, introducing a deadlock for the upstream goroutine.
The only call of Forward is here but the ctx is not managed by the errgroup.Group. So if the goroutine fails, there is no signal sent to other goroutine to stop.
We should fix the context cancellation in the pipeline file and maybe leave a TODO here to handle the retry mechanism (later in the migration).
| if lc, ok := lambdacontext.FromContext(ctx); ok { | ||
| metadata.InvokedFunctionARN = lc.InvokedFunctionArn | ||
| } else { | ||
| slog.Warn("failed lambda context loading") |
There was a problem hiding this comment.
You should change the logged message to this explanation.
My point is: if a customer see that in 6 months, the log should explain the situation clearly.
| func HandleCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) error { | ||
| logEntries, err := parseCloudwatchLogs(ctx, event, cfg) | ||
| if err != nil { | ||
| slog.Error("failed parse cloudwatch logs", slog.Any("error", err)) |
There was a problem hiding this comment.
❓ question: We don't return after the parsing error. Should we return the error and let the caller log it, or return nil.
The logEntries below will very likely to be nil/empty, so we will return nil in the end. I think we should return the error. If we can't parse the cloudwatch message, it's very likely to be a broad failure and not pretend everything was fine if you don't look at the logs.
There was a problem hiding this comment.
I'm going for the return err.
| var source string | ||
| if strings.Contains(logStream, "_CloudTrail_") { | ||
| source = "cloudtrail" | ||
| } else { | ||
| source = getSourceFromLogGroup(strings.ToLower(logGroup)) | ||
| } | ||
| return source |
There was a problem hiding this comment.
Same here regarding "fast-exit", if you have your answer, exit the function directly. This avoid nesting instruction and give a clear execution path of the function.
| ) | ||
|
|
||
| type invocationSource int | ||
| //go:generate stringer -type InvocationSource -trimprefix InvocationSource |
| var service string | ||
|
|
||
| if cfg.CustomTags != "" { | ||
| for _, tag := range strings.Split(cfg.CustomTags, ",") { |
There was a problem hiding this comment.
I think your code editor will suggest to use SplitSeq
| cfg *config.Config, | ||
| handler func(context.Context, json.RawMessage, *config.Config, chan<- T) error, | ||
| ) error { | ||
| g, ctx := errgroup.WithContext(ctx) // Fragile because if one goroutine fails, all stages will stop gracefully |
There was a problem hiding this comment.
📝 note: I don't think it's "fragile" in this case. It's the graceful shutdown management and ensure that there won't be any deadlock of one goroutine quitting listening to a chan , leaving the others stuck
There was a problem hiding this comment.
I wrote this comment as a self memory note for the future, where we probably won stop all the goroutines if, for example, a forwarding worker fails.
Co-authored-by: Vincent Boutour <vincent.boutour@datadoghq.com>
Co-authored-by: Vincent Boutour <vincent.boutour@datadoghq.com>
| if service != "" { | ||
| continue | ||
| if service == "" { | ||
| service = tag[8:] |
There was a problem hiding this comment.
More explicit with constants
What does this PR do?
Add (Cloudwatch)LogEntry creation logic (source detection, overriding tags, etc.)
Motivation
Testing Guidelines
Any for now, quick feedback before writing the tests.
Additional Notes
Except from the
CONTROL_MESSAGEskipping, the behavior is roughly the same as the Python one.Types of changes
Check all that apply