Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ require (
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260518100439-9564f35fd264
github.com/smartcontractkit/chainlink-common/keystore v1.1.0
github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20260512150409-b4068bf735e6
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43
github.com/smartcontractkit/libocr v0.0.0-20260508200755-99940c85383c
github.com/smartcontractkit/wsrpc v0.8.5-0.20250502134807-c57d3d995945
github.com/stretchr/testify v1.11.1
go.uber.org/zap v1.28.0
golang.org/x/exp v0.0.0-20260508232706-74f9aab9d74a
golang.org/x/sync v0.20.0
google.golang.org/grpc v1.81.0
Expand Down Expand Up @@ -142,7 +144,6 @@ require (
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/smartcontractkit/chain-selectors v1.0.100 // indirect
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 // indirect
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 // indirect
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20260512230622-65f10f4cd305 // indirect
github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260512230622-65f10f4cd305 // indirect
github.com/smartcontractkit/freeport v0.1.3-0.20250828155247-add56fa28aad // indirect
Expand Down Expand Up @@ -182,7 +183,6 @@ require (
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.28.0 // indirect
go.yaml.in/yaml/v2 v2.4.4 // indirect
go.yaml.in/yaml/v4 v4.0.0-rc.4 // indirect
golang.org/x/crypto v0.51.0 // indirect
Expand Down
166 changes: 166 additions & 0 deletions llo/cre/report_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package cre

import (
"bytes"
"encoding/json"
"errors"
"fmt"

"github.com/shopspring/decimal"
"google.golang.org/protobuf/proto"

commonds "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams"
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
datastreamsllo "github.com/smartcontractkit/chainlink-data-streams/llo"
"github.com/smartcontractkit/chainlink-protos/cre/go/values"
)

var _ datastreamsllo.ReportCodec = ReportCodecCapabilityTrigger{}

type ReportCodecCapabilityTrigger struct {
lggr logger.Logger
donID uint32
}

func NewReportCodecCapabilityTrigger(lggr logger.Logger, donID uint32) ReportCodecCapabilityTrigger {
return ReportCodecCapabilityTrigger{lggr, donID}
}

type ReportCodecCapabilityTriggerMultiplier struct {
Multiplier decimal.Decimal `json:"multiplier"`
StreamID llotypes.StreamID `json:"streamID"`
}

// Opts format remains unchanged
type ReportCodecCapabilityTriggerOpts struct {
// EXAMPLE
//
// [{streamID: 1000000001, "multiplier":"10000"}, ...]
//
// The total number of streams must be n, where n is the number of
// top-level elements in this ReportCodecCapabilityTriggerMultipliers array
Multipliers []ReportCodecCapabilityTriggerMultiplier `json:"multipliers"`
}

func (r *ReportCodecCapabilityTriggerOpts) Decode(opts []byte) error {
if len(opts) == 0 {
return nil
}
decoder := json.NewDecoder(bytes.NewReader(opts))
decoder.DisallowUnknownFields() // Error on unrecognized fields
return decoder.Decode(r)
}

func (r *ReportCodecCapabilityTriggerOpts) Encode() ([]byte, error) {
return json.Marshal(r)
}

// Encode a report into a capability trigger report
// the returned byte slice is the marshaled protobuf of [capabilitiespb.OCRTriggerReport]
func (r ReportCodecCapabilityTrigger) Encode(report datastreamsllo.Report, cd llotypes.ChannelDefinition, optsCache *datastreamsllo.OptsCache) ([]byte, error) {
if len(cd.Streams) != len(report.Values) {
// Invariant violation
return nil, fmt.Errorf("capability trigger expected %d streams, got %d", len(cd.Streams), len(report.Values))
}
if report.Specimen {
// Not supported for now
return nil, errors.New("capability trigger encoder does not currently support specimen reports")
}

var opts ReportCodecCapabilityTriggerOpts
var err error
opts, err = datastreamsllo.GetOpts[ReportCodecCapabilityTriggerOpts](optsCache, report.ChannelID)
if err != nil {
return nil, fmt.Errorf("failed to get opts: %w", err)
}

payload := make([]*commonds.LLOStreamDecimal, len(report.Values))
for i, stream := range report.Values {
var d []byte
switch v := stream.(type) {
case nil:
// Missing observations are nil
case *datastreamsllo.Decimal:
multipliedStreamValue := v.Decimal()

if len(opts.Multipliers) != 0 {
multipliedStreamValue = multipliedStreamValue.Mul(opts.Multipliers[i].Multiplier)
}

var err error
d, err = multipliedStreamValue.MarshalBinary()
if err != nil {
return nil, fmt.Errorf("failed to marshal decimal: %w", err)
}
default:
return nil, fmt.Errorf("only decimal StreamValues are supported, got: %T", stream)
}
payload[i] = &commonds.LLOStreamDecimal{
StreamID: cd.Streams[i].StreamID,
Decimal: d,
}
}

ste := commonds.LLOStreamsTriggerEvent{
Payload: payload,
ObservationTimestampNanoseconds: report.ObservationTimestampNanoseconds,
}
outputs, err := values.WrapMap(ste)
if err != nil {
return nil, fmt.Errorf("failed to wrap map: %w", err)
}
p := &capabilitiespb.OCRTriggerReport{
EventID: r.EventID(report),
Timestamp: report.ObservationTimestampNanoseconds,
Outputs: values.ProtoMap(outputs),
}

b, err := proto.MarshalOptions{Deterministic: true}.Marshal(p)
if err != nil {
return nil, fmt.Errorf("failed to marshal capability trigger report: %w", err)
}
return b, nil
}

func (r ReportCodecCapabilityTrigger) Verify(cd llotypes.ChannelDefinition) error {
opts := new(ReportCodecCapabilityTriggerOpts)
if err := opts.Decode(cd.Opts); err != nil {
return fmt.Errorf("invalid Opts, got: %q; %w", cd.Opts, err)
}
if opts != nil && opts.Multipliers != nil {
if len(opts.Multipliers) != len(cd.Streams) {
return fmt.Errorf("multipliers length %d != StreamValues length %d", len(opts.Multipliers), len(cd.Streams))
}

for i, stream := range cd.Streams {
if opts.Multipliers[i].StreamID != stream.StreamID {
return fmt.Errorf("LLO StreamID %d mismatched with Multiplier StreamID %d", stream.StreamID, opts.Multipliers[i].StreamID)
}
if !(opts.Multipliers[i].Multiplier.IsInteger()) {
return fmt.Errorf("multiplier for StreamID %d must be an integer", opts.Multipliers[i].StreamID)
}
if opts.Multipliers[i].Multiplier.IsZero() {
return fmt.Errorf("multiplier for StreamID %d can't be zero", opts.Multipliers[i].StreamID)
}
if opts.Multipliers[i].Multiplier.IsNegative() {
return fmt.Errorf("multiplier for StreamID %d can't be negative", opts.Multipliers[i].StreamID)
}
}
}
return nil
}

// EventID is expected to uniquely identify a (don, round)
func (r ReportCodecCapabilityTrigger) EventID(report datastreamsllo.Report) string {
return fmt.Sprintf("streams_%d_%d", r.donID, report.ObservationTimestampNanoseconds)
}

func (r ReportCodecCapabilityTrigger) ParseOpts(opts []byte) (any, error) {
var o ReportCodecCapabilityTriggerOpts
if err := o.Decode(opts); err != nil {
return nil, fmt.Errorf("failed to decode opts; got: '%s'; %w", opts, err)
}
return o, nil
}
Loading
Loading