|
18 | 18 | from __future__ import annotations |
19 | 19 |
|
20 | 20 | import base64 |
| 21 | +import hashlib |
21 | 22 | import os |
22 | 23 | from collections.abc import Callable |
23 | 24 | from typing import Any, cast |
@@ -512,10 +513,16 @@ def test_sigv4_sign_request_without_body(rest_mock: Mocker) -> None: |
512 | 513 | assert isinstance(adapter, HTTPAdapter) |
513 | 514 | adapter.add_headers(prepared) |
514 | 515 |
|
515 | | - assert prepared.headers["Authorization"].startswith("AWS4-HMAC-SHA256 Credential=") |
| 516 | + auth_header = prepared.headers["Authorization"] |
| 517 | + assert auth_header.startswith("AWS4-HMAC-SHA256 Credential=") |
516 | 518 | assert prepared.headers["Original-Authorization"] == f"Bearer {existing_token}" |
517 | 519 | assert prepared.headers["x-amz-content-sha256"] == EMPTY_BODY_SHA256 |
518 | | - assert "SignedHeaders=" in prepared.headers["Authorization"] |
| 520 | + # Verify the signature format: Credential, SignedHeaders, Signature |
| 521 | + assert "Credential=" in auth_header |
| 522 | + assert "SignedHeaders=" in auth_header |
| 523 | + assert "Signature=" in auth_header |
| 524 | + # x-amz-content-sha256 should be in signed headers |
| 525 | + assert "x-amz-content-sha256" in auth_header |
519 | 526 |
|
520 | 527 |
|
521 | 528 | def test_sigv4_sign_request_with_body(rest_mock: Mocker) -> None: |
@@ -544,11 +551,19 @@ def test_sigv4_sign_request_with_body(rest_mock: Mocker) -> None: |
544 | 551 | assert isinstance(adapter, HTTPAdapter) |
545 | 552 | adapter.add_headers(prepared) |
546 | 553 |
|
547 | | - assert prepared.headers["Authorization"].startswith("AWS4-HMAC-SHA256 Credential=") |
548 | | - assert "SignedHeaders=" in prepared.headers["Authorization"] |
| 554 | + auth_header = prepared.headers["Authorization"] |
| 555 | + assert auth_header.startswith("AWS4-HMAC-SHA256 Credential=") |
| 556 | + assert "SignedHeaders=" in auth_header |
549 | 557 | # Conflicting Authorization header is relocated |
550 | 558 | assert prepared.headers["Original-Authorization"] == f"Bearer {existing_token}" |
551 | | - assert prepared.headers["x-amz-content-sha256"] == "nhKdVGKGU3IMGjYlod9xKUVc7/H5K6zTWj60yJOM80k=" |
| 559 | + # Non-empty body should have base64-encoded SHA256 |
| 560 | + content_sha256 = prepared.headers["x-amz-content-sha256"] |
| 561 | + assert content_sha256 == "nhKdVGKGU3IMGjYlod9xKUVc7/H5K6zTWj60yJOM80k=" |
| 562 | + # Verify it's valid base64 and matches the body |
| 563 | + decoded = base64.b64decode(content_sha256) |
| 564 | + assert len(decoded) == 32 # SHA256 produces 32 bytes |
| 565 | + # x-amz-content-sha256 should be in signed headers |
| 566 | + assert "x-amz-content-sha256" in auth_header |
552 | 567 |
|
553 | 568 |
|
554 | 569 | def test_sigv4_content_sha256_with_bytes_body(rest_mock: Mocker) -> None: |
@@ -580,7 +595,12 @@ def test_sigv4_content_sha256_with_bytes_body(rest_mock: Mocker) -> None: |
580 | 595 |
|
581 | 596 | assert prepared.headers["Authorization"].startswith("AWS4-HMAC-SHA256 Credential=") |
582 | 597 | assert "SignedHeaders=" in prepared.headers["Authorization"] |
583 | | - assert prepared.headers["x-amz-content-sha256"] == "sD20bEQP+WnwKPT7jxn7PIACGciAeWjQPlzFCK5Fifo=" |
| 598 | + content_sha256 = prepared.headers["x-amz-content-sha256"] |
| 599 | + assert content_sha256 == "sD20bEQP+WnwKPT7jxn7PIACGciAeWjQPlzFCK5Fifo=" |
| 600 | + # Verify it's valid base64 and matches the body |
| 601 | + decoded = base64.b64decode(content_sha256) |
| 602 | + assert len(decoded) == 32 # SHA256 produces 32 bytes |
| 603 | + assert decoded == hashlib.sha256(body_content).digest() |
584 | 604 |
|
585 | 605 |
|
586 | 606 | def test_sigv4_conflicting_sigv4_headers(rest_mock: Mocker) -> None: |
|
0 commit comments