Skip to content

Commit 5590c3b

Browse files
Obstore workflow cache (#28)
* Obstore workflow cache * Fix issue with directory paths in local store
1 parent e726079 commit 5590c3b

3 files changed

Lines changed: 144 additions & 135 deletions

File tree

tilebox-workflows/tests/test_cache.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
from _pytest.fixtures import SubRequest
99
from moto import mock_aws
1010
from mypy_boto3_s3 import S3Client
11+
from obstore.store import LocalStore, MemoryStore
1112

12-
from tilebox.workflows.cache import AmazonS3Cache, InMemoryCache, JobCache, LocalFileSystemCache
13+
from tilebox.workflows.cache import AmazonS3Cache, InMemoryCache, JobCache, LocalFileSystemCache, ObstoreCache
1314

1415

1516
@pytest.fixture
@@ -30,7 +31,7 @@ def aws(_aws_credentials: None) -> Iterator[S3Client]:
3031
yield boto3.client("s3", region_name="us-east-1")
3132

3233

33-
caches = ["LocalFileSystem", "InMemory", "AmazonS3", "AmazonS3_no_prefix"]
34+
caches = ["LocalFileSystem", "InMemory", "AmazonS3", "AmazonS3_no_prefix", "ObstoreMemory", "ObstoreLocal"]
3435

3536

3637
@pytest.fixture
@@ -48,6 +49,10 @@ def cache(request: SubRequest, tmp_path: Path, aws: S3Client) -> JobCache:
4849
bucket = "bucket1"
4950
aws.create_bucket(Bucket=bucket)
5051
return AmazonS3Cache(bucket, prefix="")
52+
case "ObstoreMemory":
53+
return ObstoreCache(MemoryStore())
54+
case "ObstoreLocal":
55+
return ObstoreCache(LocalStore(tmp_path))
5156
case _:
5257
raise ValueError("Invalid cache type")
5358

tilebox-workflows/tilebox/workflows/cache.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from botocore.exceptions import ClientError
1111
from google.cloud.exceptions import NotFound
1212
from google.cloud.storage import Blob, Bucket
13+
from obstore.exceptions import GenericError
1314
from obstore.store import ObjectStore
1415

1516

@@ -100,11 +101,14 @@ def __getitem__(self, key: str) -> bytes:
100101
try:
101102
entry = self.store.get(str(self.prefix / key))
102103
return bytes(entry.bytes())
103-
except OSError:
104+
except (OSError, GenericError):
105+
# GenericError is raised if the key contains separator characters, but one of the parents is a file
106+
# instead of a directory
104107
raise KeyError(f"{key} is not cached!") from None
105108

106109
def __iter__(self) -> Iterator[str]:
107-
for obj in self.store.list_with_delimiter(str(self.prefix))["objects"]:
110+
prefix = "" if self.prefix == ObjectPath(".") else str(self.prefix)
111+
for obj in self.store.list_with_delimiter(prefix)["objects"]:
108112
path: str = obj["path"]
109113
yield path.removeprefix(str(self.prefix) + "/")
110114

0 commit comments

Comments
 (0)