Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,9 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non

data = data if isinstance(data, list) else [data]

if charged_event_name and charged_event_name.startswith('apify-'):
raise ValueError(f'Cannot charge for synthetic event "{charged_event_name}" manually')

# No charging, just push the data without locking.
if charged_event_name is None:
dataset = await self.open_dataset()
Expand All @@ -637,20 +640,20 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non
# If charging is requested, acquire the charge lock to prevent race conditions between concurrent
# push_data calls. We need to hold the lock for the entire push_data + charge sequence.
async with self._charge_lock:
max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit(
charged_event_name
pushed_items_count = self.get_charging_manager().calculate_push_data_limit(
items_count=len(data),
event_name=charged_event_name,
is_default_dataset=True,
)

# Push as many items as we can charge for.
pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data)

dataset = await self.open_dataset()

if pushed_items_count < len(data):
await dataset.push_data(data[:pushed_items_count])
elif pushed_items_count > 0:
await dataset.push_data(data)

# Only charge explicit events; synthetic events will be processed within the client.
return await self.get_charging_manager().charge(
event_name=charged_event_name,
count=pushed_items_count,
Expand Down
77 changes: 68 additions & 9 deletions src/apify/_charging.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import math
from contextvars import ContextVar
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal
Expand Down Expand Up @@ -31,6 +32,14 @@

run_validator = TypeAdapter[ActorRun | None](ActorRun | None)

DEFAULT_DATASET_ITEM_EVENT = 'apify-default-dataset-item'

# Context variable to hold the current `ChargingManager` instance, if any. This allows PPE-aware dataset clients to
# access the charging manager without needing to pass it explicitly.
charging_manager_ctx: ContextVar[ChargingManagerImplementation | None] = ContextVar(
'charging_manager_ctx', default=None
)


@docs_group('Charging')
class ChargingManager(Protocol):
Expand Down Expand Up @@ -81,6 +90,28 @@ def get_charged_event_count(self, event_name: str) -> int:
def get_max_total_charge_usd(self) -> Decimal:
"""Get the configured maximum total charge for this Actor run."""

def calculate_push_data_limit(
self,
items_count: int,
event_name: str,
*,
is_default_dataset: bool,
) -> int:
"""Calculate how many items can be pushed and charged within the current budget.

Accounts for both the explicit event and the synthetic `DEFAULT_DATASET_ITEM_EVENT` event,
so that the combined cost per item does not exceed the remaining budget.

Args:
items_count: The number of items to be pushed.
event_name: The explicit event name to charge for each item.
is_default_dataset: Whether the data is pushed to the default dataset.
If True, the synthetic event cost is included in the combined price.

Returns:
Max number of items that can be pushed within the budget.
"""


@docs_group('Charging')
@dataclass(frozen=True)
Expand Down Expand Up @@ -190,6 +221,11 @@ async def __aenter__(self) -> None:

self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME)

# if the Actor runs with the pay-per-event pricing model, set the context variable so that PPE-aware dataset
# clients can access the charging manager and charge for synthetic events.
if self._pricing_model == 'PAY_PER_EVENT':
charging_manager_ctx.set(self)

async def __aexit__(
self,
exc_type: type[BaseException] | None,
Expand All @@ -199,6 +235,7 @@ async def __aexit__(
if not self.active:
raise RuntimeError('Exiting an uninitialized ChargingManager')

charging_manager_ctx.set(None)
self.active = False

@ensure_context
Expand Down Expand Up @@ -258,7 +295,11 @@ def calculate_chargeable() -> dict[str, int | None]:
if self._actor_run_id is None:
raise RuntimeError('Actor run ID not configured')

if event_name in self._pricing_info:
if event_name.startswith('apify-'):
# Synthetic events (e.g. apify-default-dataset-item) are tracked internally only,
# the platform handles them automatically based on dataset writes.
pass
elif event_name in self._pricing_info:
await self._client.run(self._actor_run_id).charge(event_name, charged_count)
else:
logger.warning(f"Attempting to charge for an unknown event '{event_name}'")
Expand Down Expand Up @@ -300,14 +341,7 @@ def calculate_total_charged_amount(self) -> Decimal:

@ensure_context
def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None:
pricing_info = self._pricing_info.get(event_name)

if pricing_info is not None:
price = pricing_info.price
elif not self._is_at_home:
price = Decimal(1) # Use a nonzero price for local development so that the maximum budget can be reached
else:
price = Decimal()
price = self._get_event_price(event_name)

if not price:
return None
Expand Down Expand Up @@ -337,6 +371,25 @@ def get_charged_event_count(self, event_name: str) -> int:
def get_max_total_charge_usd(self) -> Decimal:
return self._max_total_charge_usd

@ensure_context
def calculate_push_data_limit(
self,
items_count: int,
event_name: str,
*,
is_default_dataset: bool,
) -> int:
explicit_price = self._get_event_price(event_name)
synthetic_price = self._get_event_price(DEFAULT_DATASET_ITEM_EVENT) if is_default_dataset else Decimal(0)
combined_price = explicit_price + synthetic_price

if not combined_price:
return items_count

result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / combined_price
max_count = max(0, math.floor(result)) if result.is_finite() else items_count
return min(items_count, max_count)

async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict:
"""Fetch pricing information from environment variables or API."""
# Check if pricing info is available via environment variables
Expand Down Expand Up @@ -370,6 +423,12 @@ async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict:
max_total_charge_usd=self._configuration.max_total_charge_usd or Decimal('inf'),
)

def _get_event_price(self, event_name: str) -> Decimal:
pricing_info = self._pricing_info.get(event_name)
if pricing_info is not None:
return pricing_info.price
return Decimal(0) if self._is_at_home else Decimal(1)


@dataclass
class ChargingStateItem:
Expand Down
31 changes: 18 additions & 13 deletions src/apify/storage_clients/_apify/_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata

from ._api_client_creation import create_storage_api_client
from apify.storage_clients._ppe_dataset_mixin import _DatasetClientPPEMixin

if TYPE_CHECKING:
from collections.abc import AsyncIterator
Expand All @@ -25,7 +26,7 @@
logger = getLogger(__name__)


class ApifyDatasetClient(DatasetClient):
class ApifyDatasetClient(DatasetClient, _DatasetClientPPEMixin):
"""An Apify platform implementation of the dataset client."""

_MAX_PAYLOAD_SIZE = ByteSize.from_mb(9)
Expand All @@ -48,6 +49,8 @@ def __init__(

Preferably use the `ApifyDatasetClient.open` class method to create a new instance.
"""
super().__init__()

self._api_client = api_client
"""The Apify dataset client for API operations."""

Expand Down Expand Up @@ -108,12 +111,16 @@ async def open(
id=id,
)

return cls(
dataset_client = cls(
api_client=api_client,
api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635
lock=asyncio.Lock(),
)

dataset_client.is_default_dataset = (await dataset_client.get_metadata()).id == configuration.default_dataset_id

return dataset_client

@override
async def purge(self) -> None:
raise NotImplementedError(
Expand All @@ -128,21 +135,19 @@ async def drop(self) -> None:

@override
async def push_data(self, data: list[Any] | dict[str, Any]) -> None:
async def payloads_generator() -> AsyncIterator[str]:
for index, item in enumerate(data):
async def payloads_generator(items: list[Any]) -> AsyncIterator[str]:
for index, item in enumerate(items):
yield await self._check_and_serialize(item, index)

async with self._lock:
# Handle lists
if isinstance(data, list):
# Invoke client in series to preserve the order of data
async for items in self._chunk_by_size(payloads_generator()):
await self._api_client.push_items(items=items)
items = data if isinstance(data, list) else [data]
limit = await self._calculate_limit_for_push(len(items))
items = items[:limit]

# Handle singular items
else:
items = await self._check_and_serialize(data)
await self._api_client.push_items(items=items)
async for chunk in self._chunk_by_size(payloads_generator(items)):
await self._api_client.push_items(items=chunk)

await self._charge_for_items(count_items=limit)

@override
async def get_data(
Expand Down
49 changes: 49 additions & 0 deletions src/apify/storage_clients/_file_system/_dataset_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from typing_extensions import Self, override

from crawlee.storage_clients._file_system import FileSystemDatasetClient

from apify._configuration import Configuration as ApifyConfiguration
from apify.storage_clients._ppe_dataset_mixin import _DatasetClientPPEMixin

if TYPE_CHECKING:
from crawlee.configuration import Configuration


class ApifyFileSystemDatasetClient(FileSystemDatasetClient, _DatasetClientPPEMixin):
def __init__(self, *args: Any, **kwargs: Any) -> None:
FileSystemDatasetClient.__init__(self, *args, **kwargs)
_DatasetClientPPEMixin.__init__(self)

@override
@classmethod
async def open(
cls,
*,
id: str | None,
name: str | None,
alias: str | None,
configuration: Configuration | ApifyConfiguration,
) -> Self:

dataset_client = await super().open(
id=id,
name=name,
alias=alias,
configuration=configuration,
)

if isinstance(configuration, ApifyConfiguration) and all(v is None for v in (id, name, alias)):
dataset_client.is_default_dataset = True

return dataset_client

@override
async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
items = data if isinstance(data, list) else [data]
limit = await self._calculate_limit_for_push(len(items))
await super().push_data(items[:limit])
await self._charge_for_items(limit)
18 changes: 18 additions & 0 deletions src/apify/storage_clients/_file_system/_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from crawlee.configuration import Configuration
from crawlee.storage_clients import FileSystemStorageClient

from ._dataset_client import ApifyFileSystemDatasetClient
from ._key_value_store_client import ApifyFileSystemKeyValueStoreClient

if TYPE_CHECKING:
Expand Down Expand Up @@ -48,3 +49,20 @@ async def create_kvs_client(
)
await self._purge_if_needed(client, configuration)
return client

@override
async def create_dataset_client(
self,
*,
id: str | None = None,
name: str | None = None,
alias: str | None = None,
configuration: Configuration | None = None,
) -> ApifyFileSystemDatasetClient:
configuration = configuration or Configuration.get_global_configuration()
return await ApifyFileSystemDatasetClient.open(
id=id,
name=name,
alias=alias,
configuration=configuration,
)
31 changes: 31 additions & 0 deletions src/apify/storage_clients/_ppe_dataset_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from apify._charging import DEFAULT_DATASET_ITEM_EVENT, charging_manager_ctx


class _DatasetClientPPEMixin:
"""A mixin for dataset clients to add support for PPE pricing model and tracking synthetic events."""

def __init__(self) -> None:
self._is_default_dataset: bool = False

@property
def is_default_dataset(self) -> bool:
return self._is_default_dataset

@is_default_dataset.setter
def is_default_dataset(self, value: bool) -> None:
self._is_default_dataset = value

async def _calculate_limit_for_push(self, items_count: int) -> int:
if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()):
max_charged_count = charging_manager.calculate_max_event_charge_count_within_limit(
event_name=DEFAULT_DATASET_ITEM_EVENT
)
return min(max_charged_count, items_count) if max_charged_count is not None else items_count
return items_count

async def _charge_for_items(self, count_items: int) -> None:
if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()):
await charging_manager.charge(
event_name=DEFAULT_DATASET_ITEM_EVENT,
count=count_items,
)
Loading
Loading