diff --git a/app/app.go b/app/app.go index bad9997209..2f155648a9 100644 --- a/app/app.go +++ b/app/app.go @@ -364,6 +364,36 @@ type gigaBlockCache struct { baseFee *big.Int } +// gigaStaticEVMTx contains tx data that can be derived without touching chain state. +// Keep this state-free so future consensus can run it ahead of stateful block execution. +type gigaStaticEVMTx struct { + msg *evmtypes.MsgEVMTransaction + ethTx *ethtypes.Transaction + sender common.Address + seiAddr sdk.AccAddress +} + +type gigaStaticTxResult struct { + evm *gigaStaticEVMTx + errResult *abci.ExecTxResult + err error +} + +type gigaStaticBlockKey struct { + height int64 + id string +} + +type gigaStaticBlockResult struct { + typedTxs []sdk.Tx + staticTxs []*gigaStaticTxResult +} + +type gigaStaticBlockJob struct { + done chan struct{} + result *gigaStaticBlockResult +} + func newGigaBlockCache(ctx sdk.Context, keeper *gigaevmkeeper.Keeper) (*gigaBlockCache, error) { chainID := keeper.ChainID(ctx) gp := keeper.GetGasPool() @@ -443,6 +473,8 @@ type App struct { optimisticProcessingInfo OptimisticProcessingInfo optimisticProcessingInfoMutex sync.RWMutex + gigaStaticPipelineMutex sync.Mutex + gigaStaticPipeline map[gigaStaticBlockKey]*gigaStaticBlockJob txDecoder sdk.TxDecoder AnteHandler sdk.AnteHandler @@ -1296,6 +1328,14 @@ func (app *App) ProcessProposalHandler(ctx sdk.Context, req *abci.RequestProcess }, nil } + bpreq := &BlockProcessRequest{ + Hash: req.Hash, + ByzantineValidators: req.ByzantineValidators, + Height: req.Header.Height, + Time: req.Header.Time, + } + app.StartGigaStaticBlockProcessing(ctx, req.Txs, bpreq, typedTxs) + app.optimisticProcessingInfoMutex.Lock() shouldStartOptimisticProcessing := app.optimisticProcessingInfo.Completion == nil if shouldStartOptimisticProcessing { @@ -1321,12 +1361,6 @@ func (app *App) ProcessProposalHandler(ctx sdk.Context, req *abci.RequestProcess go func() { // ProcessBlock has panic recovery and returns error for any processing failures // All panics (including GetSigners) are handled in ProcessBlock, not affecting proposal acceptance - bpreq := &BlockProcessRequest{ - Hash: req.Hash, - ByzantineValidators: req.ByzantineValidators, - Height: req.Header.Height, - Time: req.Header.Time, - } events, txResults, endBlockResp, processErr := app.ProcessBlock(ctx, req.Txs, bpreq, req.ProposedLastCommit, false, typedTxs) app.optimisticProcessingInfoMutex.Lock() @@ -1515,7 +1549,202 @@ func (app *App) ProcessTxsSynchronousV2(ctx sdk.Context, txs [][]byte, typedTxs return txResults } -func (app *App) ProcessTxsSynchronousGiga(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx) []*abci.ExecTxResult { +func execTxResultFromAnteError(err error) *abci.ExecTxResult { + codespace, code, log := sdkerrors.ABCIInfo(err, false) + return &abci.ExecTxResult{ + Codespace: codespace, + Code: code, + Log: log, + } +} + +func execTxResultFromGigaPanic(r interface{}) *abci.ExecTxResult { + if oogErr, isOOG := r.(sdk.ErrorOutOfGas); isOOG { + return &abci.ExecTxResult{ + Codespace: sdkerrors.RootCodespace, + Code: sdkerrors.ErrOutOfGas.ABCICode(), + Log: fmt.Sprintf("out of gas in location: %v", oogErr.Descriptor), + } + } + logger.Error("panic in giga static processor", "panic", r, "stack", string(debug.Stack())) + return &abci.ExecTxResult{ + Codespace: sdkerrors.UndefinedCodespace, + Code: sdkerrors.ErrPanic.ABCICode(), + Log: fmt.Sprintf("panic recovered: %v", r), + } +} + +func (app *App) prepareGigaStaticTx(ctx sdk.Context, tx sdk.Tx, chainID *big.Int) *gigaStaticTxResult { + evmMsg := app.GetEVMMsg(tx) + if evmMsg == nil { + return &gigaStaticTxResult{} + } + + // Validate the Cosmos SDK envelope and EVM stateless invariants before stateful execution. + // This is read-only and can be run independently for any block whose header is known. + if err := appante.EvmStatelessChecks(ctx, tx, chainID); err != nil { + return &gigaStaticTxResult{errResult: execTxResultFromAnteError(err)} + } + + ethTx, _ := evmMsg.AsTransaction() + if ethTx == nil { + return &gigaStaticTxResult{err: fmt.Errorf("failed to convert to eth transaction")} + } + + sender, seiAddr, _, recoverErr := evmante.RecoverSenderFromEthTx(ctx, ethTx, chainID) + if recoverErr != nil { + return &gigaStaticTxResult{errResult: &abci.ExecTxResult{ + Code: 1, + Log: fmt.Sprintf("failed to recover sender from signature: %v", recoverErr), + }} + } + + return &gigaStaticTxResult{evm: &gigaStaticEVMTx{ + msg: evmMsg, + ethTx: ethTx, + sender: sender, + seiAddr: seiAddr, + }} +} + +func (app *App) prepareGigaStaticTxWithRecovery(ctx sdk.Context, tx sdk.Tx, chainID *big.Int) (res *gigaStaticTxResult) { + defer func() { + if r := recover(); r != nil { + res = &gigaStaticTxResult{errResult: execTxResultFromGigaPanic(r)} + } + }() + return app.prepareGigaStaticTx(ctx, tx, chainID) +} + +func (app *App) prepareGigaStaticTxsConcurrently(ctx sdk.Context, typedTxs []sdk.Tx, chainID *big.Int) []*gigaStaticTxResult { + results := make([]*gigaStaticTxResult, len(typedTxs)) + if len(typedTxs) == 0 { + return results + } + + workers := app.ConcurrencyWorkers() + if workers < 1 || workers > len(typedTxs) { + workers = len(typedTxs) + } + + jobs := make(chan int, len(typedTxs)) + wg := sync.WaitGroup{} + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for idx := range jobs { + results[idx] = app.prepareGigaStaticTxWithRecovery(ctx, typedTxs[idx], chainID) + } + }() + } + for idx := range typedTxs { + jobs <- idx + } + close(jobs) + wg.Wait() + return results +} + +func (app *App) prepareGigaStaticTxsByChecksumConcurrently(ctx sdk.Context, entries []*sdk.DeliverTxEntry, chainID *big.Int) map[[32]byte]*gigaStaticTxResult { + typedTxs := make([]sdk.Tx, len(entries)) + for i, entry := range entries { + typedTxs[i] = entry.SdkTx + } + + results := app.prepareGigaStaticTxsConcurrently(ctx, typedTxs, chainID) + byChecksum := make(map[[32]byte]*gigaStaticTxResult, len(entries)) + for i, entry := range entries { + byChecksum[entry.Checksum] = results[i] + } + return byChecksum +} + +const gigaStaticPipelineRetentionBlocks int64 = 8 + +func newGigaStaticBlockKey(height int64, hash []byte, txs [][]byte) gigaStaticBlockKey { + if len(hash) > 0 { + return gigaStaticBlockKey{height: height, id: hex.EncodeToString(hash)} + } + h := sha256.New() + for _, tx := range txs { + _, _ = h.Write([]byte(fmt.Sprintf("%d:", len(tx)))) + _, _ = h.Write(tx) + } + return gigaStaticBlockKey{height: height, id: hex.EncodeToString(h.Sum(nil))} +} + +func (app *App) pruneGigaStaticPipelineLocked(height int64) { + for key := range app.gigaStaticPipeline { + if key.height+gigaStaticPipelineRetentionBlocks < height { + delete(app.gigaStaticPipeline, key) + } + } +} + +func (app *App) runGigaStaticBlockJob(ctx sdk.Context, txs [][]byte, preDecoded []sdk.Tx, chainID *big.Int, job *gigaStaticBlockJob) { + defer close(job.done) + + typedTxs := preDecoded + if len(typedTxs) != len(txs) { + typedTxs = app.DecodeTxBytesConcurrently(txs) + } + job.result = &gigaStaticBlockResult{ + typedTxs: typedTxs, + staticTxs: app.prepareGigaStaticTxsConcurrently(ctx, typedTxs, chainID), + } +} + +// StartGigaStaticBlockProcessing begins state-free giga block processing in the background. +// Future block ingress can call this as soon as tx bytes are known, then ProcessBlock will +// consume the prepared result before stateful execution for that block. +func (app *App) StartGigaStaticBlockProcessing(ctx sdk.Context, txs [][]byte, req *BlockProcessRequest, preDecoded []sdk.Tx) { + if !app.GigaExecutorEnabled || req == nil { + return + } + + key := newGigaStaticBlockKey(req.Height, req.Hash, txs) + app.gigaStaticPipelineMutex.Lock() + if app.gigaStaticPipeline == nil { + app.gigaStaticPipeline = make(map[gigaStaticBlockKey]*gigaStaticBlockJob) + } + app.pruneGigaStaticPipelineLocked(req.Height) + if _, found := app.gigaStaticPipeline[key]; found { + app.gigaStaticPipelineMutex.Unlock() + return + } + job := &gigaStaticBlockJob{done: make(chan struct{})} + app.gigaStaticPipeline[key] = job + app.gigaStaticPipelineMutex.Unlock() + + chainID := app.GigaEvmKeeper.ChainID(ctx) + go app.runGigaStaticBlockJob(ctx, txs, preDecoded, chainID, job) +} + +func (app *App) waitForGigaStaticBlockProcessing(req *BlockProcessRequest, txs [][]byte) *gigaStaticBlockResult { + if req == nil { + return nil + } + + key := newGigaStaticBlockKey(req.Height, req.Hash, txs) + app.gigaStaticPipelineMutex.Lock() + job := app.gigaStaticPipeline[key] + app.gigaStaticPipelineMutex.Unlock() + if job == nil { + return nil + } + + <-job.done + + app.gigaStaticPipelineMutex.Lock() + if app.gigaStaticPipeline[key] == job { + delete(app.gigaStaticPipeline, key) + } + app.gigaStaticPipelineMutex.Unlock() + return job.result +} + +func (app *App) ProcessTxsSynchronousGiga(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx, staticTxs []*gigaStaticTxResult) []*abci.ExecTxResult { blockProcessGigaStart := time.Now() defer func() { utilmetrics.BlockProcessLatency(blockProcessGigaStart, utilmetrics.Synchronous) // TODO(PLT-327): remove once app_block_process_duration_seconds verified @@ -1533,13 +1762,37 @@ func (app *App) ProcessTxsSynchronousGiga(ctx sdk.Context, txs [][]byte, typedTx logger.Error("failed to build giga block cache", "error", cacheErr, "height", ctx.BlockHeight()) return nil } + if len(staticTxs) != len(typedTxs) { + staticTxs = app.prepareGigaStaticTxsConcurrently(ctx, typedTxs, cache.chainID) + } + + recordGigaTxProcessed := func() { + utilmetrics.IncrTxProcessTypeCounter(utilmetrics.Synchronous) // TODO(PLT-327): remove once app_tx_process_type_total verified + appMetrics.txProcessType.Add(ctx.Context(), 1, + otelmetric.WithAttributes(attribute.String("type", utilmetrics.SynchronousGiga))) + } txResults := make([]*abci.ExecTxResult, len(txs)) for i, tx := range txs { ctx = ctx.WithTxIndex(i) - evmMsg := app.GetEVMMsg(typedTxs[i]) - // If not an EVM tx, fall back to v2 processing - if evmMsg == nil { + staticTx := staticTxs[i] + if staticTx != nil && staticTx.errResult != nil { + txResults[i] = staticTx.errResult + ctx.GigaMultiStore().WriteGiga() + recordGigaTxProcessed() + continue + } + if staticTx != nil && staticTx.err != nil { + txResults[i] = &abci.ExecTxResult{ + Code: 1, + Log: fmt.Sprintf("[BUG] giga static processing error: %v", staticTx.err), + } + recordGigaTxProcessed() + continue + } + + // If not an EVM tx, fall back to v2 processing. + if staticTx == nil || staticTx.evm == nil { result := app.DeliverTxWithResult(ctx, tx, typedTxs[i]) txResults[i] = result ms.Write() @@ -1574,19 +1827,7 @@ func (app *App) ProcessTxsSynchronousGiga(ctx sdk.Context, txs [][]byte, typedTx } }() - // Validate Cosmos SDK envelope (memo, timeoutHeight, signerInfos, etc.) - // This prevents consensus divergence if a malicious proposer includes invalid envelope fields. - if err := appante.EvmStatelessChecks(ctx, typedTxs[i], cache.chainID); err != nil { - codespace, code, log := sdkerrors.ABCIInfo(err, false) - result = &abci.ExecTxResult{ - Codespace: codespace, - Code: code, - Log: log, - } - return - } - - result, execErr = app.executeEVMTxWithGigaExecutor(ctx, evmMsg, cache) + result, execErr = app.executeEVMTxWithGigaExecutor(ctx, staticTx.evm, cache) }() if execErr != nil { @@ -1606,9 +1847,7 @@ func (app *App) ProcessTxsSynchronousGiga(ctx sdk.Context, txs [][]byte, typedTx txResults[i] = result ctx.GigaMultiStore().WriteGiga() - utilmetrics.IncrTxProcessTypeCounter(utilmetrics.Synchronous) // TODO(PLT-327): remove once app_tx_process_type_total verified - appMetrics.txProcessType.Add(ctx.Context(), 1, - otelmetric.WithAttributes(attribute.String("type", utilmetrics.SynchronousGiga))) + recordGigaTxProcessed() } return txResults @@ -1652,14 +1891,14 @@ func (app *App) CacheContext(ctx sdk.Context) (sdk.Context, sdk.CacheMultiStore) } // ExecuteTxsConcurrently calls the appropriate function for processing transacitons -func (app *App) ExecuteTxsConcurrently(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx) ([]*abci.ExecTxResult, sdk.Context) { +func (app *App) ExecuteTxsConcurrently(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx, staticTxs []*gigaStaticTxResult) ([]*abci.ExecTxResult, sdk.Context) { processSynchronously := app.shouldProcessSingleRecipientEVMTransfersSynchronously(typedTxs) // Giga only supports synchronous execution for now if app.GigaExecutorEnabled && app.GigaOCCEnabled && !processSynchronously { - return app.ProcessTXsWithOCCGiga(ctx, txs, typedTxs) + return app.ProcessTXsWithOCCGiga(ctx, txs, typedTxs, staticTxs) } else if app.GigaExecutorEnabled { - return app.ProcessTxsSynchronousGiga(ctx, txs, typedTxs), ctx + return app.ProcessTxsSynchronousGiga(ctx, txs, typedTxs, staticTxs), ctx } else if !ctx.IsOCCEnabled() { return app.ProcessTxsSynchronousV2(ctx, txs, typedTxs), ctx } @@ -1751,7 +1990,7 @@ func (app *App) ProcessTXsWithOCCV2(ctx sdk.Context, txs [][]byte, typedTxs []sd } // ProcessTXsWithOCCGiga runs the transactions concurrently via OCC, using the Giga executor -func (app *App) ProcessTXsWithOCCGiga(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx) ([]*abci.ExecTxResult, sdk.Context) { +func (app *App) ProcessTXsWithOCCGiga(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx, staticTxs []*gigaStaticTxResult) ([]*abci.ExecTxResult, sdk.Context) { blockProcessStart := time.Now() delegatedToV2 := false defer func() { @@ -1796,12 +2035,20 @@ func (app *App) ProcessTXsWithOCCGiga(ctx sdk.Context, txs [][]byte, typedTxs [] logger.Error("failed to build giga block cache", "error", cacheErr, "height", ctx.BlockHeight()) return nil, ctx } + staticTxsByChecksum := make(map[[32]byte]*gigaStaticTxResult, len(evmEntries)) + if len(staticTxs) == len(txs) { + for _, entry := range evmEntries { + staticTxsByChecksum[entry.Checksum] = staticTxs[entry.AbsoluteIndex] + } + } else { + staticTxsByChecksum = app.prepareGigaStaticTxsByChecksumConcurrently(evmCtx, evmEntries, cache.chainID) + } // Create OCC scheduler with giga executor deliverTx capturing the cache. evmScheduler := tasks.NewScheduler( app.ConcurrencyWorkers(), app.TracingInfo, - app.makeGigaDeliverTx(cache), + app.makeGigaDeliverTx(cache, staticTxsByChecksum), ) var evmSchedErr error @@ -1883,7 +2130,8 @@ func (app *App) ProcessTXsWithOCCGiga(ctx sdk.Context, txs [][]byte, typedTxs [] // ProcessBlock executes block transactions. If preDecoded is non-nil and len(preDecoded)==len(txs), // those decoded transactions are reused (bytes are not decoded again); EVM preprocessing still runs -// on the block context. +// on the block context. Giga static processing may have been started before this method and is +// consumed here before stateful execution. func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req *BlockProcessRequest, lastCommit abci.CommitInfo, simulate bool, preDecoded []sdk.Tx) (events []abci.Event, txResults []*abci.ExecTxResult, endBlockResp abci.ResponseEndBlock, err error) { defer func() { if r := recover(); r != nil { @@ -1921,15 +2169,26 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req *BlockProcessReq blockSpan.SetAttributes(attribute.Int64("height", req.Height)) ctx = ctx.WithTraceSpanContext(blockSpanCtx) + if app.GigaExecutorEnabled { + app.StartGigaStaticBlockProcessing(ctx, txs, req, preDecoded) + } + beginBlockResp := app.BeginBlock(ctx, req.Height, lastCommit.Votes, req.ByzantineValidators, true) events = append(events, beginBlockResp.Events...) evmTxs := make([]*evmtypes.MsgEVMTransaction, len(txs)) // nil for non-EVM txs var typedTxs []sdk.Tx - if len(preDecoded) == len(txs) { + var staticTxs []*gigaStaticTxResult + if app.GigaExecutorEnabled { + if staticBlock := app.waitForGigaStaticBlockProcessing(req, txs); staticBlock != nil { + typedTxs = staticBlock.typedTxs + staticTxs = staticBlock.staticTxs + } + } + if typedTxs == nil && len(preDecoded) == len(txs) { typedTxs = preDecoded app.FinalizeDecodedTransactionsConcurrently(ctx, typedTxs) - } else { + } else if typedTxs == nil { typedTxs = app.DecodeTransactionsConcurrently(ctx, txs) } @@ -1938,7 +2197,7 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req *BlockProcessReq } // Execute all transactions - txResults, ctx = app.ExecuteTxsConcurrently(ctx, txs, typedTxs) + txResults, ctx = app.ExecuteTxsConcurrently(ctx, txs, typedTxs, staticTxs) midBlockEvents := app.MidBlock(ctx, req.Height) events = append(events, midBlockEvents...) @@ -1970,25 +2229,16 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req *BlockProcessReq return events, txResults, endBlockResp, nil } -// executeEVMTxWithGigaExecutor executes a single EVM transaction using the giga executor. -// The sender address is recovered directly from the transaction signature - no Cosmos SDK ante handlers needed. -func (app *App) executeEVMTxWithGigaExecutor(ctx sdk.Context, msg *evmtypes.MsgEVMTransaction, cache *gigaBlockCache) (*abci.ExecTxResult, error) { - // Get the Ethereum transaction from the message - ethTx, _ := msg.AsTransaction() - if ethTx == nil { +// executeEVMTxWithGigaExecutor executes a single statically prepared EVM transaction using the giga executor. +func (app *App) executeEVMTxWithGigaExecutor(ctx sdk.Context, staticTx *gigaStaticEVMTx, cache *gigaBlockCache) (*abci.ExecTxResult, error) { + if staticTx == nil || staticTx.msg == nil || staticTx.ethTx == nil { return nil, fmt.Errorf("failed to convert to eth transaction") } - chainID := cache.chainID - - // Recover sender using the same logic as preprocess.go (version-based signer selection) - sender, seiAddr, _, recoverErr := evmante.RecoverSenderFromEthTx(ctx, ethTx, chainID) - if recoverErr != nil { - return &abci.ExecTxResult{ - Code: 1, - Log: fmt.Sprintf("failed to recover sender from signature: %v", recoverErr), - }, nil - } + msg := staticTx.msg + ethTx := staticTx.ethTx + sender := staticTx.sender + seiAddr := staticTx.seiAddr _, isAssociated := app.GigaEvmKeeper.GetEVMAddress(ctx, seiAddr) @@ -2219,7 +2469,7 @@ func (app *App) executeEVMTxWithGigaExecutor(ctx sdk.Context, msg *evmtypes.MsgE // gigaDeliverTx is the OCC-compatible deliverTx function for the giga executor. // makeGigaDeliverTx returns an OCC-compatible deliverTx callback that captures the given // block cache, avoiding mutable state on App for cache lifecycle management. -func (app *App) makeGigaDeliverTx(cache *gigaBlockCache) func(sdk.Context, abci.RequestDeliverTxV2, sdk.Tx, [32]byte) abci.ResponseDeliverTx { +func (app *App) makeGigaDeliverTx(cache *gigaBlockCache, staticTxsByChecksum map[[32]byte]*gigaStaticTxResult) func(sdk.Context, abci.RequestDeliverTxV2, sdk.Tx, [32]byte) abci.ResponseDeliverTx { return func(ctx sdk.Context, req abci.RequestDeliverTxV2, tx sdk.Tx, checksum [32]byte) (resp abci.ResponseDeliverTx) { defer func() { if r := recover(); r != nil { @@ -2250,23 +2500,31 @@ func (app *App) makeGigaDeliverTx(cache *gigaBlockCache) func(sdk.Context, abci. } }() - evmMsg := app.GetEVMMsg(tx) - if evmMsg == nil { - return abci.ResponseDeliverTx{Code: 1, Log: "not an EVM transaction"} + staticTx := staticTxsByChecksum[checksum] + if staticTx == nil { + staticTx = app.prepareGigaStaticTxWithRecovery(ctx, tx, cache.chainID) } - - // Validate Cosmos SDK envelope (memo, timeoutHeight, signerInfos, etc.) - // This prevents consensus divergence if a malicious proposer includes invalid envelope fields. - if err := appante.EvmStatelessChecks(ctx, tx, cache.chainID); err != nil { - codespace, code, log := sdkerrors.ABCIInfo(err, false) + if staticTx.errResult != nil { return abci.ResponseDeliverTx{ - Codespace: codespace, - Code: code, - Log: log, + Code: staticTx.errResult.Code, + Data: staticTx.errResult.Data, + Log: staticTx.errResult.Log, + Info: staticTx.errResult.Info, + GasWanted: staticTx.errResult.GasWanted, + GasUsed: staticTx.errResult.GasUsed, + Events: staticTx.errResult.Events, + Codespace: staticTx.errResult.Codespace, + EvmTxInfo: staticTx.errResult.EvmTxInfo, } } + if staticTx.err != nil { + return abci.ResponseDeliverTx{Code: 1, Log: fmt.Sprintf("giga static processing error: %v", staticTx.err)} + } + if staticTx.evm == nil { + return abci.ResponseDeliverTx{Code: 1, Log: "not an EVM transaction"} + } - result, err := app.executeEVMTxWithGigaExecutor(ctx, evmMsg, cache) + result, err := app.executeEVMTxWithGigaExecutor(ctx, staticTx.evm, cache) if err != nil { // Check if this is a fail-fast error (Cosmos precompile interop detected) if gigautils.ShouldExecutionAbort(err) { diff --git a/app/giga_static_test.go b/app/giga_static_test.go new file mode 100644 index 0000000000..ff9db31613 --- /dev/null +++ b/app/giga_static_test.go @@ -0,0 +1,133 @@ +package app + +import ( + "encoding/hex" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/sei-protocol/sei-chain/sei-cosmos/crypto/keys/secp256k1" + sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" + "github.com/sei-protocol/sei-chain/x/evm/config" + evmtypes "github.com/sei-protocol/sei-chain/x/evm/types" + "github.com/sei-protocol/sei-chain/x/evm/types/ethtx" + "github.com/stretchr/testify/require" +) + +func TestPrepareGigaStaticTxsConcurrentlyPrecomputesEVMMeta(t *testing.T) { + tm := time.Now().UTC() + valPub := secp256k1.GenPrivKey().PubKey() + wrapper := NewTestWrapper(t, tm, valPub, false) + + privKey := secp256k1.GenPrivKey() + key, err := crypto.HexToECDSA(hex.EncodeToString(privKey.Bytes())) + require.NoError(t, err) + + to := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") + txData := ðtypes.LegacyTx{ + Nonce: 1, + GasPrice: big.NewInt(10), + Gas: 21000, + To: &to, + } + chainCfg := evmtypes.DefaultChainConfig() + ethCfg := chainCfg.EthereumConfig(big.NewInt(config.DefaultChainID)) + signer := ethtypes.MakeSigner(ethCfg, big.NewInt(wrapper.Ctx.BlockHeight()), uint64(wrapper.Ctx.BlockTime().Unix())) + ethTx, err := ethtypes.SignTx(ethtypes.NewTx(txData), signer, key) + require.NoError(t, err) + + ethTxData, err := ethtx.NewTxDataFromTx(ethTx) + require.NoError(t, err) + msg, err := evmtypes.NewMsgEVMTransaction(ethTxData) + require.NoError(t, err) + + txBuilder := wrapper.App.GetTxConfig().NewTxBuilder() + require.NoError(t, txBuilder.SetMsgs(msg)) + txBytes, err := wrapper.App.GetTxConfig().TxEncoder()(txBuilder.GetTx()) + require.NoError(t, err) + + typedTxs := wrapper.App.DecodeTxBytesConcurrently([][]byte{txBytes}) + require.NotNil(t, typedTxs[0]) + + chainID := wrapper.App.GigaEvmKeeper.ChainID(wrapper.Ctx) + staticTxs := wrapper.App.prepareGigaStaticTxsConcurrently(wrapper.Ctx, typedTxs, chainID) + + require.Len(t, staticTxs, 1) + require.NotNil(t, staticTxs[0]) + require.Nil(t, staticTxs[0].errResult) + require.NoError(t, staticTxs[0].err) + require.NotNil(t, staticTxs[0].evm) + require.Equal(t, ethTx.Hash(), staticTxs[0].evm.ethTx.Hash()) + require.Equal(t, crypto.PubkeyToAddress(key.PublicKey), staticTxs[0].evm.sender) + require.Equal(t, sdk.AccAddress(privKey.PubKey().Address()), staticTxs[0].evm.seiAddr) +} + +func TestGigaStaticBlockPipelineAllowsMultipleHeights(t *testing.T) { + tm := time.Now().UTC() + valPub := secp256k1.GenPrivKey().PubKey() + wrapper := NewTestWrapper(t, tm, valPub, false) + wrapper.App.GigaExecutorEnabled = true + + txBytes, sender, seiAddr := buildGigaStaticTestTx(t, wrapper, 1) + txs := [][]byte{txBytes} + ctxH := wrapper.Ctx.WithBlockHeight(10) + ctxNext := wrapper.Ctx.WithBlockHeight(11) + reqH := &BlockProcessRequest{Height: 10, Hash: []byte("block-h"), Time: tm} + reqNext := &BlockProcessRequest{Height: 11, Hash: []byte("block-h-plus-1"), Time: tm} + + wrapper.App.StartGigaStaticBlockProcessing(ctxNext, txs, reqNext, nil) + wrapper.App.StartGigaStaticBlockProcessing(ctxH, txs, reqH, nil) + + staticH := wrapper.App.waitForGigaStaticBlockProcessing(reqH, txs) + require.NotNil(t, staticH) + require.Len(t, staticH.typedTxs, 1) + require.Len(t, staticH.staticTxs, 1) + require.Equal(t, sender, staticH.staticTxs[0].evm.sender) + require.Equal(t, seiAddr, staticH.staticTxs[0].evm.seiAddr) + + wrapper.App.gigaStaticPipelineMutex.Lock() + _, nextStillPrepared := wrapper.App.gigaStaticPipeline[newGigaStaticBlockKey(reqNext.Height, reqNext.Hash, txs)] + wrapper.App.gigaStaticPipelineMutex.Unlock() + require.True(t, nextStillPrepared, "future block static work should remain available while current block is consumed") + + staticNext := wrapper.App.waitForGigaStaticBlockProcessing(reqNext, txs) + require.NotNil(t, staticNext) + require.Equal(t, sender, staticNext.staticTxs[0].evm.sender) + require.Equal(t, seiAddr, staticNext.staticTxs[0].evm.seiAddr) +} + +func buildGigaStaticTestTx(t *testing.T, wrapper *TestWrapper, nonce uint64) ([]byte, common.Address, sdk.AccAddress) { + t.Helper() + + privKey := secp256k1.GenPrivKey() + key, err := crypto.HexToECDSA(hex.EncodeToString(privKey.Bytes())) + require.NoError(t, err) + + to := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") + txData := ðtypes.LegacyTx{ + Nonce: nonce, + GasPrice: big.NewInt(10), + Gas: 21000, + To: &to, + } + chainCfg := evmtypes.DefaultChainConfig() + ethCfg := chainCfg.EthereumConfig(big.NewInt(config.DefaultChainID)) + signer := ethtypes.MakeSigner(ethCfg, big.NewInt(wrapper.Ctx.BlockHeight()), uint64(wrapper.Ctx.BlockTime().Unix())) + ethTx, err := ethtypes.SignTx(ethtypes.NewTx(txData), signer, key) + require.NoError(t, err) + + ethTxData, err := ethtx.NewTxDataFromTx(ethTx) + require.NoError(t, err) + msg, err := evmtypes.NewMsgEVMTransaction(ethTxData) + require.NoError(t, err) + + txBuilder := wrapper.App.GetTxConfig().NewTxBuilder() + require.NoError(t, txBuilder.SetMsgs(msg)) + txBytes, err := wrapper.App.GetTxConfig().TxEncoder()(txBuilder.GetTx()) + require.NoError(t, err) + + return txBytes, crypto.PubkeyToAddress(key.PublicKey), sdk.AccAddress(privKey.PubKey().Address()) +}