-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsegment.py
More file actions
76 lines (64 loc) · 2.76 KB
/
segment.py
File metadata and controls
76 lines (64 loc) · 2.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import os
import glob
import json
import datetime
from dotenv import load_dotenv
from BloomFilter import build_bloom_from_json
load_dotenv()
class Segment:
def __init__(self):
self.segment_file_path = os.path.expanduser(os.getenv("SEGMENTS_PATH", "./segments"))
self.threshold = int(os.getenv("THRESHOLD", 100))
self.create_ws()
def create_ws(self):
expanded_path = os.path.expanduser(self.segment_file_path)
os.makedirs(expanded_path, exist_ok=True)
return expanded_path
def get_segment_files(self):
return glob.glob(os.path.join(self.segment_file_path, "*.json"))
def merge_segments(self):
json_files = sorted(self.get_segment_files())
merged = {}
output_files = []
for segment_file in json_files:
with open(segment_file, "r") as f:
data = json.load(f)
if isinstance(data, dict):
data = [{"key": k, "value": v, "tombstone": False, "ts": 0} for k, v in data.items()]
for entry in data:
k, ts = entry["key"], entry.get("ts", 0)
if k not in merged or merged[k]["ts"] < ts:
merged[k] = entry
if len(json.dumps(merged)) > self.threshold:
now = int(datetime.datetime.now().timestamp() * 1000)
output_file = os.path.join(self.segment_file_path, f"segment_{now}.json")
cleaned = [v for v in merged.values() if not v.get("tombstone", False)]
with open(output_file, "w") as out:
json.dump(cleaned, out, indent=2)
output_files.append(output_file)
merged = {}
if merged:
now = int(datetime.datetime.now().timestamp() * 1000)
output_file = os.path.join(self.segment_file_path, f"segment_{now}.json")
cleaned = [v for v in merged.values() if not v.get("tombstone", False)]
with open(output_file, "w") as out:
json.dump(cleaned, out, indent=2)
output_files.append(output_file)
for seg in json_files:
os.remove(seg)
return output_files
def search_in_json_segments(self, key):
for segment_file in self.get_segment_files():
bf = build_bloom_from_json(segment_file)
if not bf.check(key):
continue
with open(segment_file, "r") as f:
data = json.load(f)
if isinstance(data, dict):
if key in data:
return data[key]
elif isinstance(data, list):
for entry in data:
if entry["key"] == key:
return entry
return None