diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 76f4475b..c63b819b 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -628,6 +628,9 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non data = data if isinstance(data, list) else [data] + if charged_event_name and charged_event_name.startswith('apify-'): + raise ValueError(f'Cannot charge for synthetic event "{charged_event_name}" manually') + # No charging, just push the data without locking. if charged_event_name is None: dataset = await self.open_dataset() @@ -637,13 +640,12 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non # If charging is requested, acquire the charge lock to prevent race conditions between concurrent # push_data calls. We need to hold the lock for the entire push_data + charge sequence. async with self._charge_lock: - max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit( - charged_event_name + pushed_items_count = self.get_charging_manager().calculate_push_data_limit( + items_count=len(data), + event_name=charged_event_name, + is_default_dataset=True, ) - # Push as many items as we can charge for. - pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data) - dataset = await self.open_dataset() if pushed_items_count < len(data): @@ -651,6 +653,7 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non elif pushed_items_count > 0: await dataset.push_data(data) + # Only charge explicit events; synthetic events will be processed within the client. return await self.get_charging_manager().charge( event_name=charged_event_name, count=pushed_items_count, diff --git a/src/apify/_charging.py b/src/apify/_charging.py index 7e2bfeab..fd4b6163 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -1,6 +1,7 @@ from __future__ import annotations import math +from contextvars import ContextVar from dataclasses import dataclass from datetime import datetime, timezone from decimal import Decimal @@ -31,6 +32,14 @@ run_validator = TypeAdapter[ActorRun | None](ActorRun | None) +DEFAULT_DATASET_ITEM_EVENT = 'apify-default-dataset-item' + +# Context variable to hold the current `ChargingManager` instance, if any. This allows PPE-aware dataset clients to +# access the charging manager without needing to pass it explicitly. +charging_manager_ctx: ContextVar[ChargingManagerImplementation | None] = ContextVar( + 'charging_manager_ctx', default=None +) + @docs_group('Charging') class ChargingManager(Protocol): @@ -81,6 +90,28 @@ def get_charged_event_count(self, event_name: str) -> int: def get_max_total_charge_usd(self) -> Decimal: """Get the configured maximum total charge for this Actor run.""" + def calculate_push_data_limit( + self, + items_count: int, + event_name: str, + *, + is_default_dataset: bool, + ) -> int: + """Calculate how many items can be pushed and charged within the current budget. + + Accounts for both the explicit event and the synthetic `DEFAULT_DATASET_ITEM_EVENT` event, + so that the combined cost per item does not exceed the remaining budget. + + Args: + items_count: The number of items to be pushed. + event_name: The explicit event name to charge for each item. + is_default_dataset: Whether the data is pushed to the default dataset. + If True, the synthetic event cost is included in the combined price. + + Returns: + Max number of items that can be pushed within the budget. + """ + @docs_group('Charging') @dataclass(frozen=True) @@ -190,6 +221,11 @@ async def __aenter__(self) -> None: self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME) + # if the Actor runs with the pay-per-event pricing model, set the context variable so that PPE-aware dataset + # clients can access the charging manager and charge for synthetic events. + if self._pricing_model == 'PAY_PER_EVENT': + charging_manager_ctx.set(self) + async def __aexit__( self, exc_type: type[BaseException] | None, @@ -199,6 +235,7 @@ async def __aexit__( if not self.active: raise RuntimeError('Exiting an uninitialized ChargingManager') + charging_manager_ctx.set(None) self.active = False @ensure_context @@ -258,7 +295,11 @@ def calculate_chargeable() -> dict[str, int | None]: if self._actor_run_id is None: raise RuntimeError('Actor run ID not configured') - if event_name in self._pricing_info: + if event_name.startswith('apify-'): + # Synthetic events (e.g. apify-default-dataset-item) are tracked internally only, + # the platform handles them automatically based on dataset writes. + pass + elif event_name in self._pricing_info: await self._client.run(self._actor_run_id).charge(event_name, charged_count) else: logger.warning(f"Attempting to charge for an unknown event '{event_name}'") @@ -300,14 +341,7 @@ def calculate_total_charged_amount(self) -> Decimal: @ensure_context def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None: - pricing_info = self._pricing_info.get(event_name) - - if pricing_info is not None: - price = pricing_info.price - elif not self._is_at_home: - price = Decimal(1) # Use a nonzero price for local development so that the maximum budget can be reached - else: - price = Decimal() + price = self._get_event_price(event_name) if not price: return None @@ -337,6 +371,25 @@ def get_charged_event_count(self, event_name: str) -> int: def get_max_total_charge_usd(self) -> Decimal: return self._max_total_charge_usd + @ensure_context + def calculate_push_data_limit( + self, + items_count: int, + event_name: str, + *, + is_default_dataset: bool, + ) -> int: + explicit_price = self._get_event_price(event_name) + synthetic_price = self._get_event_price(DEFAULT_DATASET_ITEM_EVENT) if is_default_dataset else Decimal(0) + combined_price = explicit_price + synthetic_price + + if not combined_price: + return items_count + + result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / combined_price + max_count = max(0, math.floor(result)) if result.is_finite() else items_count + return min(items_count, max_count) + async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict: """Fetch pricing information from environment variables or API.""" # Check if pricing info is available via environment variables @@ -370,6 +423,12 @@ async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict: max_total_charge_usd=self._configuration.max_total_charge_usd or Decimal('inf'), ) + def _get_event_price(self, event_name: str) -> Decimal: + pricing_info = self._pricing_info.get(event_name) + if pricing_info is not None: + return pricing_info.price + return Decimal(0) if self._is_at_home else Decimal(1) + @dataclass class ChargingStateItem: diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index a918bddd..5aa4c3d8 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -13,6 +13,7 @@ from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata from ._api_client_creation import create_storage_api_client +from apify.storage_clients._ppe_dataset_mixin import _DatasetClientPPEMixin if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -25,7 +26,7 @@ logger = getLogger(__name__) -class ApifyDatasetClient(DatasetClient): +class ApifyDatasetClient(DatasetClient, _DatasetClientPPEMixin): """An Apify platform implementation of the dataset client.""" _MAX_PAYLOAD_SIZE = ByteSize.from_mb(9) @@ -48,6 +49,8 @@ def __init__( Preferably use the `ApifyDatasetClient.open` class method to create a new instance. """ + super().__init__() + self._api_client = api_client """The Apify dataset client for API operations.""" @@ -108,12 +111,16 @@ async def open( id=id, ) - return cls( + dataset_client = cls( api_client=api_client, api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 lock=asyncio.Lock(), ) + dataset_client.is_default_dataset = (await dataset_client.get_metadata()).id == configuration.default_dataset_id + + return dataset_client + @override async def purge(self) -> None: raise NotImplementedError( @@ -128,21 +135,19 @@ async def drop(self) -> None: @override async def push_data(self, data: list[Any] | dict[str, Any]) -> None: - async def payloads_generator() -> AsyncIterator[str]: - for index, item in enumerate(data): + async def payloads_generator(items: list[Any]) -> AsyncIterator[str]: + for index, item in enumerate(items): yield await self._check_and_serialize(item, index) async with self._lock: - # Handle lists - if isinstance(data, list): - # Invoke client in series to preserve the order of data - async for items in self._chunk_by_size(payloads_generator()): - await self._api_client.push_items(items=items) + items = data if isinstance(data, list) else [data] + limit = await self._calculate_limit_for_push(len(items)) + items = items[:limit] - # Handle singular items - else: - items = await self._check_and_serialize(data) - await self._api_client.push_items(items=items) + async for chunk in self._chunk_by_size(payloads_generator(items)): + await self._api_client.push_items(items=chunk) + + await self._charge_for_items(count_items=limit) @override async def get_data( diff --git a/src/apify/storage_clients/_file_system/_dataset_client.py b/src/apify/storage_clients/_file_system/_dataset_client.py new file mode 100644 index 00000000..e87534e0 --- /dev/null +++ b/src/apify/storage_clients/_file_system/_dataset_client.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from typing_extensions import Self, override + +from crawlee.storage_clients._file_system import FileSystemDatasetClient + +from apify._configuration import Configuration as ApifyConfiguration +from apify.storage_clients._ppe_dataset_mixin import _DatasetClientPPEMixin + +if TYPE_CHECKING: + from crawlee.configuration import Configuration + + +class ApifyFileSystemDatasetClient(FileSystemDatasetClient, _DatasetClientPPEMixin): + def __init__(self, *args: Any, **kwargs: Any) -> None: + FileSystemDatasetClient.__init__(self, *args, **kwargs) + _DatasetClientPPEMixin.__init__(self) + + @override + @classmethod + async def open( + cls, + *, + id: str | None, + name: str | None, + alias: str | None, + configuration: Configuration | ApifyConfiguration, + ) -> Self: + + dataset_client = await super().open( + id=id, + name=name, + alias=alias, + configuration=configuration, + ) + + if isinstance(configuration, ApifyConfiguration) and all(v is None for v in (id, name, alias)): + dataset_client.is_default_dataset = True + + return dataset_client + + @override + async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None: + items = data if isinstance(data, list) else [data] + limit = await self._calculate_limit_for_push(len(items)) + await super().push_data(items[:limit]) + await self._charge_for_items(limit) diff --git a/src/apify/storage_clients/_file_system/_storage_client.py b/src/apify/storage_clients/_file_system/_storage_client.py index 2b7134c7..f6cb66cc 100644 --- a/src/apify/storage_clients/_file_system/_storage_client.py +++ b/src/apify/storage_clients/_file_system/_storage_client.py @@ -7,6 +7,7 @@ from crawlee.configuration import Configuration from crawlee.storage_clients import FileSystemStorageClient +from ._dataset_client import ApifyFileSystemDatasetClient from ._key_value_store_client import ApifyFileSystemKeyValueStoreClient if TYPE_CHECKING: @@ -48,3 +49,20 @@ async def create_kvs_client( ) await self._purge_if_needed(client, configuration) return client + + @override + async def create_dataset_client( + self, + *, + id: str | None = None, + name: str | None = None, + alias: str | None = None, + configuration: Configuration | None = None, + ) -> ApifyFileSystemDatasetClient: + configuration = configuration or Configuration.get_global_configuration() + return await ApifyFileSystemDatasetClient.open( + id=id, + name=name, + alias=alias, + configuration=configuration, + ) diff --git a/src/apify/storage_clients/_ppe_dataset_mixin.py b/src/apify/storage_clients/_ppe_dataset_mixin.py new file mode 100644 index 00000000..973bfcf8 --- /dev/null +++ b/src/apify/storage_clients/_ppe_dataset_mixin.py @@ -0,0 +1,31 @@ +from apify._charging import DEFAULT_DATASET_ITEM_EVENT, charging_manager_ctx + + +class _DatasetClientPPEMixin: + """A mixin for dataset clients to add support for PPE pricing model and tracking synthetic events.""" + + def __init__(self) -> None: + self._is_default_dataset: bool = False + + @property + def is_default_dataset(self) -> bool: + return self._is_default_dataset + + @is_default_dataset.setter + def is_default_dataset(self, value: bool) -> None: + self._is_default_dataset = value + + async def _calculate_limit_for_push(self, items_count: int) -> int: + if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()): + max_charged_count = charging_manager.calculate_max_event_charge_count_within_limit( + event_name=DEFAULT_DATASET_ITEM_EVENT + ) + return min(max_charged_count, items_count) if max_charged_count is not None else items_count + return items_count + + async def _charge_for_items(self, count_items: int) -> None: + if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()): + await charging_manager.charge( + event_name=DEFAULT_DATASET_ITEM_EVENT, + count=count_items, + ) diff --git a/tests/e2e/test_actor_charge.py b/tests/e2e/test_actor_charge.py index d72062bc..0e2e98a0 100644 --- a/tests/e2e/test_actor_charge.py +++ b/tests/e2e/test_actor_charge.py @@ -20,6 +20,53 @@ from .conftest import MakeActorFunction, RunActorFunction +@pytest_asyncio.fixture(scope='module', loop_scope='module') +async def ppe_push_data_actor_build(make_actor: MakeActorFunction) -> str: + async def main() -> None: + async with Actor: + await Actor.push_data( + [{'id': i} for i in range(5)], + 'push-item', + ) + + actor_client = await make_actor('ppe-push-data', main_func=main) + + await actor_client.update( + pricing_infos=[ + { + 'pricingModel': 'PAY_PER_EVENT', + 'pricingPerEvent': { + 'actorChargeEvents': { + 'push-item': { + 'eventTitle': 'Push item', + 'eventPriceUsd': 0.05, + 'eventDescription': 'One pushed item', + }, + 'apify-default-dataset-item': { + 'eventTitle': 'Default dataset item', + 'eventPriceUsd': 0.05, + 'eventDescription': 'One item written to the default dataset', + }, + }, + }, + }, + ] + ) + + actor = await actor_client.get() + + assert actor is not None + return str(actor['id']) + + +@pytest_asyncio.fixture(scope='function', loop_scope='module') +async def ppe_push_data_actor( + ppe_push_data_actor_build: str, + apify_client_async: ApifyClientAsync, +) -> ActorClientAsync: + return apify_client_async.actor(ppe_push_data_actor_build) + + @pytest_asyncio.fixture(scope='module', loop_scope='module') async def ppe_actor_build(make_actor: MakeActorFunction) -> str: async def main() -> None: @@ -114,3 +161,58 @@ async def test_actor_charge_limit( except AssertionError: if is_last_attempt: raise + + +async def test_actor_push_data_charges_both_events( + ppe_push_data_actor: ActorClientAsync, + run_actor: RunActorFunction, + apify_client_async: ApifyClientAsync, +) -> None: + """Test that push_data charges both the explicit event and the synthetic apify-default-dataset-item event.""" + run = await run_actor(ppe_push_data_actor) + + # Refetch until the platform gets its act together + for is_last_attempt, _ in retry_counter(30): + await asyncio.sleep(1) + updated_run = await apify_client_async.run(run.id).get() + run = ActorRun.model_validate(updated_run) + + try: + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == { + 'push-item': 5, + 'apify-default-dataset-item': 5, + } + break + except AssertionError: + if is_last_attempt: + raise + + +async def test_actor_push_data_combined_budget_limit( + ppe_push_data_actor: ActorClientAsync, + run_actor: RunActorFunction, + apify_client_async: ApifyClientAsync, +) -> None: + """Test that push_data respects combined budget: explicit ($0.05) + synthetic ($0.05) = $0.10/item. + + With max_total_charge_usd=$0.20, only 2 of 5 items fit in the budget. + """ + run = await run_actor(ppe_push_data_actor, max_total_charge_usd=Decimal('0.20')) + + # Refetch until the platform gets its act together + for is_last_attempt, _ in retry_counter(30): + await asyncio.sleep(1) + updated_run = await apify_client_async.run(run.id).get() + run = ActorRun.model_validate(updated_run) + + try: + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == { + 'push-item': 2, + 'apify-default-dataset-item': 2, + } + break + except AssertionError: + if is_last_attempt: + raise diff --git a/tests/unit/actor/test_actor_charge.py b/tests/unit/actor/test_actor_charge.py index c9b14a88..3070aa4f 100644 --- a/tests/unit/actor/test_actor_charge.py +++ b/tests/unit/actor/test_actor_charge.py @@ -143,6 +143,49 @@ async def test_max_event_charge_count_within_limit_tolerates_overdraw() -> None: assert max_count == 0 +async def test_push_data_combined_price_limits_items() -> None: + """Test that push_data limits items when the combined explicit + synthetic event price exceeds the budget.""" + async with setup_mocked_charging( + Configuration(max_total_charge_usd=Decimal('3.00'), test_pay_per_event=True) + ) as setup: + setup.charging_mgr._pricing_info['scrape'] = PricingInfoItem(Decimal('1.00'), 'Scrape') + setup.charging_mgr._pricing_info['apify-default-dataset-item'] = PricingInfoItem( + Decimal('1.00'), 'Default dataset item' + ) + + data = [{'id': i} for i in range(5)] + result = await Actor.push_data(data, 'scrape') + + assert result is not None + assert result.charged_count == 1 + + dataset = await Actor.open_dataset() + items = await dataset.get_data() + assert len(items.items) == 1 + assert items.items[0] == {'id': 0} + + +async def test_push_data_charges_synthetic_event_for_default_dataset() -> None: + """Test that push_data charges both the explicit event and the synthetic apify-default-dataset-item event.""" + async with setup_mocked_charging( + Configuration(max_total_charge_usd=Decimal('10.00'), test_pay_per_event=True) + ) as setup: + setup.charging_mgr._pricing_info['test'] = PricingInfoItem(Decimal('0.10'), 'Test') + setup.charging_mgr._pricing_info['apify-default-dataset-item'] = PricingInfoItem( + Decimal('0.05'), 'Dataset item' + ) + + data = [{'id': i} for i in range(3)] + result = await Actor.push_data(data, 'test') + + assert result is not None + assert result.charged_count == 3 + + # Both explicit and synthetic events should be charged + assert setup.charging_mgr.get_charged_event_count('test') == 3 + assert setup.charging_mgr.get_charged_event_count('apify-default-dataset-item') == 3 + + async def test_charge_with_overdrawn_budget() -> None: configuration = Configuration( max_total_charge_usd=Decimal('0.00025'), diff --git a/tests/unit/actor/test_charging_manager.py b/tests/unit/actor/test_charging_manager.py index 4a1ef480..94f31759 100644 --- a/tests/unit/actor/test_charging_manager.py +++ b/tests/unit/actor/test_charging_manager.py @@ -247,6 +247,78 @@ async def test_get_max_total_charge_usd(mock_client: MagicMock) -> None: assert cm.get_max_total_charge_usd() == Decimal('42.50') +async def test_calculate_push_data_limit_no_ppe(mock_client: MagicMock) -> None: + """Returns items_count when no PPE pricing is configured (prices are zero).""" + config = _make_config(actor_pricing_info=None, charged_event_counts={}) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + result = cm.calculate_push_data_limit(10, 'some-event', is_default_dataset=True) + assert result == 10 + + +async def test_calculate_push_data_limit_within_budget(mock_client: MagicMock) -> None: + """Returns full items_count when combined budget is sufficient for all items.""" + pricing_info = _make_ppe_pricing_info({'click': Decimal('0.01'), 'apify-default-dataset-item': Decimal('0.01')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('10.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + # combined price = 0.02/item, budget = 10.00, max = 500 + result = cm.calculate_push_data_limit(5, 'click', is_default_dataset=True) + assert result == 5 + + +async def test_calculate_push_data_limit_budget_exceeded(mock_client: MagicMock) -> None: + """Returns capped count when combined price (explicit + synthetic) exceeds budget.""" + pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00'), 'apify-default-dataset-item': Decimal('1.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('3.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + # combined price = 2.00/item, budget = 3.00, max = floor(3/2) = 1 + result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=True) + assert result == 1 + + +async def test_calculate_push_data_limit_without_default_dataset(mock_client: MagicMock) -> None: + """When not pushing to the default dataset, only explicit event price is considered.""" + pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00'), 'apify-default-dataset-item': Decimal('1.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('3.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + # explicit price only = 1.00/item, budget = 3.00, max = floor(3/1) = 3 + result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=False) + assert result == 3 + + +async def test_calculate_push_data_limit_exhausted_budget(mock_client: MagicMock) -> None: + """Returns 0 when the budget is fully exhausted before the push.""" + pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={'scrape': 3}, + max_total_charge_usd=Decimal('3.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + result = cm.calculate_push_data_limit(5, 'scrape', is_default_dataset=False) + assert result == 0 + + async def test_charge_limit_reached(mock_client: MagicMock) -> None: """Test that event_charge_limit_reached is True when budget is exhausted.""" pricing_info = _make_ppe_pricing_info({'search': Decimal('5.00')})