-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathsdk_interface.py
More file actions
3409 lines (2871 loc) · 123 KB
/
sdk_interface.py
File metadata and controls
3409 lines (2871 loc) · 123 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import copy
import io
import json
import logging
import os
import sys
import uuid
from pathlib import Path
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from typing import TypeVar
from typing import Union
from typing_extensions import Literal
if sys.version_info < (3, 11):
from typing_extensions import TypedDict, NotRequired, Required # noqa
else:
from typing import TypedDict, NotRequired, Required # noqa
import boto3
from tqdm import tqdm
from superannotate_core.core.conditions import CONDITION_EQ as EQ
from superannotate_core.infrastructure.repositories.item_repository import Attachment
from superannotate_core.core.conditions import Condition
from superannotate_core.core.conditions import EmptyCondition
from superannotate_core.core.enums import FolderStatus
from superannotate_core.core.enums import ClassTypeEnum
from superannotate_core.core.enums import AnnotationStatus
from superannotate_core.core.enums import ProjectType
from superannotate_core.core.enums import ApprovalStatus
from superannotate_core.core.entities import AttributeGroupSchema
from superannotate_core.core.entities import AnnotationClassEntity
from superannotate_core.core.exceptions import SAException
import lib.core as constants
from lib.app.helpers import get_annotation_paths
from lib.app.helpers import get_name_url_duplicated_from_csv
from lib.app.helpers import wrap_error as wrap_validation_errors
from lib.app.interface.base_interface import BaseInterfaceFacade
from lib.app.interface.base_interface import TrackableMeta
from lib.app.interface.types import EmailStr
from lib.app.serializers import BaseSerializer
from lib.app.serializers import ProjectSerializer
from lib.app.serializers import SettingsSerializer
from lib.app.serializers import TeamSerializer
from lib.core import LIMITED_FUNCTIONS
from lib.core import entities
from lib.core.entities import WorkflowEntity
from lib.core.entities import SettingEntity
from lib.core.entities.integrations import IntegrationEntity
from lib.core.entities.integrations import IntegrationTypeEnum
from lib.core.enums import ImageQuality
from lib.core.exceptions import AppException
from lib.core.types import MLModel
from lib.core.pydantic_v1 import constr
from lib.core.pydantic_v1 import conlist
from lib.core.pydantic_v1 import parse_obj_as
from lib.infrastructure.utils import extract_project_folder
from superannotate_core.core.entities import BaseItemEntity
from superannotate_core.app import Project, Folder
def serialize_item(entity: BaseItemEntity, project: Project, folder: Folder = None):
if project.upload_state != constants.UploadState.EXTERNAL:
entity.url = None
if project.type in constants.ProjectType.images:
if project.type == constants.ProjectType.VECTOR:
entity.segmentation_status = None
if project.upload_state == constants.UploadState.EXTERNAL:
entity.prediction_status = None
entity.segmentation_status = None
if folder:
entity.path = (
f"{project.name}{f'/{folder.name}' if folder.name != 'root' else ''}"
)
return entity
logger = logging.getLogger("sa")
NotEmptyStr = TypeVar("NotEmptyStr", bound=constr(strict=True, min_length=1))
PROJECT_STATUS = Literal["NotStarted", "InProgress", "Completed", "OnHold"]
PROJECT_TYPE = Literal[
"Vector",
"Pixel",
"Video",
"Document",
"Tiled",
"PointCloud",
"GenAI",
]
ANNOTATION_STATUS = Literal[
"NotStarted", "InProgress", "QualityCheck", "Returned", "Completed", "Skipped"
]
APPROVAL_STATUS = Literal["Approved", "Disapproved", None]
IMAGE_QUALITY = Literal["compressed", "original"]
ANNOTATION_TYPE = Literal["bbox", "polygon", "point", "tag"]
ANNOTATOR_ROLE = Literal["Admin", "Annotator", "QA"]
FOLDER_STATUS = Literal["NotStarted", "InProgress", "Completed", "OnHold"]
class Setting(TypedDict):
attribute: str
value: Union[str, float, int]
class PriorityScore(TypedDict):
name: str
priority: float
class SAClient(BaseInterfaceFacade, metaclass=TrackableMeta):
"""Create SAClient instance to authorize SDK in a team scope.
In case of no argument has been provided, SA_TOKEN environmental variable
will be checked or $HOME/.superannotate/config.json will be used.
:param token: team token
:type token: str
:param config_path: path to config file
:type config_path: path-like (str or Path)
"""
def __init__(
self,
token: str = None,
config_path: str = None,
):
super().__init__(token, config_path)
def get_project_by_id(self, project_id: int):
"""Returns the project metadata
:param project_id: the id of the project
:type project_id: int
:return: project metadata
:rtype: dict
"""
# response = self.controller.get_project_by_id(project_id=project_id)
#
# return ProjectSerializer(response.data).serialize()
return Project.get_by_id(self.session, project_id).dict()
def get_folder_by_id(self, project_id: int, folder_id: int):
"""Returns the folder metadata
:param project_id: the id of the project
:type project_id: int
:param folder_id: the id of the folder
:type folder_id: int
:return: folder metadata
:rtype: dict
"""
#
# response = self.controller.get_folder_by_id(
# folder_id=folder_id, project_id=project_id
# )
# response.raise_for_status()
# return FolderSerializer(response.data).serialize(
# exclude={"completedCount", "is_root"}
# )
return Folder.get_by_id(self.session, project_id, folder_id).dict()
def get_item_by_id(self, project_id: int, item_id: int):
"""Returns the item metadata
:param project_id: the id of the project
:type project_id: int
:param item_id: the id of the item
:type item_id: int
:return: item metadata
:rtype: dict
"""
project = self.controller.get_project(pk=project_id)
return serialize_item(project.get_item(pk=item_id), project).dict(
exclude={"url", "meta"}
)
def get_team_metadata(self):
"""Returns team metadata
:return: team metadata
:rtype: dict
"""
response = self.controller.get_team()
return TeamSerializer(response.data).serialize()
def search_team_contributors(
self,
email: EmailStr = None,
first_name: NotEmptyStr = None,
last_name: NotEmptyStr = None,
return_metadata: bool = True,
):
"""Search for contributors in the team
:param email: filter by email
:type email: str
:param first_name: filter by first name
:type first_name: str
:param last_name: filter by last name
:type last_name: str
:param return_metadata: return metadata of contributors instead of names
:type return_metadata: bool
:return: metadata of found users
:rtype: list of dicts
"""
contributors = self.controller.search_team_contributors(
email=email, first_name=first_name, last_name=last_name
).data
if not return_metadata:
return [contributor["email"] for contributor in contributors]
return contributors
def search_projects(
self,
name: Optional[NotEmptyStr] = None,
return_metadata: bool = False,
include_complete_item_count: bool = False,
status: Optional[Union[PROJECT_STATUS, List[PROJECT_STATUS]]] = None,
):
"""
Project name based case-insensitive search for projects.
If **name** is None, all the projects will be returned.
:param name: search string
:type name: str
:param return_metadata: return metadata of projects instead of names
:type return_metadata: bool
:param include_complete_item_count: return projects that have completed items and include
the number of completed items in response.
:type include_complete_item_count: bool
:param status: search projects via project status
:type status: str
:return: project names or metadatas
:rtype: list of strs or dicts
"""
statuses = []
if status:
if isinstance(status, (list, tuple, set)):
statuses = list(status)
else:
statuses = [status]
condition = Condition.get_empty_condition()
if name:
condition &= Condition("name", name, EQ)
if include_complete_item_count:
condition &= Condition(
"completeImagesCount", include_complete_item_count, EQ
)
for status in statuses:
condition &= Condition(
"status", constants.ProjectStatus.get_value(status), EQ
)
response = self.controller.projects.list(condition)
if response.errors:
raise AppException(response.errors)
if return_metadata:
return [
i.dict(
{
"settings",
"workflows",
"contributors",
"classes",
"item_count",
}
)
for i in response.data
]
else:
return [project.name for project in response.data]
def create_project(
self,
project_name: NotEmptyStr,
project_description: NotEmptyStr,
project_type: PROJECT_TYPE,
settings: List[Setting] = None,
# fix validation for class
# classes: List[AnnotationClassEntity] = None,
classes: List = None,
workflows: List = None,
instructions_link: str = None,
):
"""Create a new project in the team.
:param project_name: the new project's name
:type project_name: str
:param project_description: the new project's description
:type project_description: str
:param project_type: the new project type, Vector, Pixel, Video, Document, Tiled, PointCloud, GenAI.
:type project_type: str
:param settings: list of settings objects
:type settings: list of dicts
:param classes: list of class objects
:type classes: list of dicts
:param workflows: list of information for each step
:type workflows: list of dicts
:param instructions_link: str of instructions URL
:type instructions_link: str
:return: dict object metadata the new project
:rtype: dict
"""
if workflows:
if project_type.capitalize() not in (
constants.ProjectType.VECTOR.name,
constants.ProjectType.PIXEL.name,
):
raise AppException(
f"Workflow is not supported in {project_type} project."
)
parse_obj_as(List[WorkflowEntity], workflows)
if workflows and not classes:
raise AppException(
"Project with workflows can not be created without classes."
)
if settings:
settings = parse_obj_as(List[SettingEntity], settings)
else:
settings = []
if classes:
classes = parse_obj_as(List[AnnotationClassEntity], classes)
if workflows and classes:
invalid_classes = []
class_names = [_class.name for _class in classes]
for step in workflows:
if step["className"] not in class_names:
invalid_classes.append(step["className"])
if invalid_classes:
seen = set()
seen_add = seen.add
invalid_classes = [
i for i in invalid_classes if not (i in seen or seen_add(i))
]
raise AppException(
f"There are no [{', '.join(invalid_classes)}] classes created in the project."
)
project_response = self.controller.projects.create(
entities.ProjectEntity(
name=project_name,
description=project_description,
type=constants.ProjectType.get_value(project_type),
settings=settings,
instructions_link=instructions_link,
)
)
project_response.raise_for_status()
project = project_response.data
if classes:
classes_response = self.controller.annotation_classes.create_multiple(
project, classes
)
classes_response.raise_for_status()
project.classes = classes_response.data
if workflows:
workflow_response = self.controller.projects.set_workflows(
project, workflows
)
workflow_response.raise_for_status()
project.workflows = self.controller.projects.list_workflow(project).data
return ProjectSerializer(project).serialize()
# TODO fix
def clone_project(
self,
project_name: Union[NotEmptyStr, dict],
from_project: Union[NotEmptyStr, dict],
project_description: Optional[NotEmptyStr] = None,
copy_annotation_classes: Optional[bool] = True,
copy_settings: Optional[bool] = True,
copy_workflow: Optional[bool] = False,
copy_contributors: Optional[bool] = False,
):
"""Create a new project in the team using annotation classes and settings from from_project.
:param project_name: new project's name
:type project_name: str
:param from_project: the name of the project being used for duplication
:type from_project: str
:param project_description: the new project's description. If None, from_project's
description will be used
:type project_description: str
:param copy_annotation_classes: enables copying annotation classes
:type copy_annotation_classes: bool
:param copy_settings: enables copying project settings
:type copy_settings: bool
:param copy_workflow: enables copying project workflow
:type copy_workflow: bool
:param copy_contributors: enables copying project contributors
:type copy_contributors: bool
:return: dict object metadata of the new project
:rtype: dict
"""
response = self.controller.projects.get_metadata(
self.controller.get_project(from_project),
include_annotation_classes=copy_annotation_classes,
include_settings=copy_settings,
include_workflow=copy_workflow,
include_contributors=copy_contributors,
)
response.raise_for_status()
project: entities.ProjectEntity = response.data
if copy_workflow and project.type not in (
constants.ProjectType.VECTOR,
constants.ProjectType.PIXEL,
):
raise AppException(
f"Workflow is not supported in {project.type.name} project."
)
project_copy = copy.copy(project)
if project_copy.type in (
constants.ProjectType.VECTOR,
constants.ProjectType.PIXEL,
):
project_copy.upload_state = constants.UploadState.INITIAL
if project_description:
project_copy.description = project_description
else:
project_copy.description = project.description
project_copy.name = project_name
create_response = self.controller.projects.create(project_copy)
create_response.raise_for_status()
new_project = create_response.data
if copy_contributors:
logger.info(f"Cloning contributors from {from_project} to {project_name}.")
self.controller.projects.add_contributors(
self.controller.team, new_project, project.contributors
)
if copy_annotation_classes:
logger.info(
f"Cloning annotation classes from {from_project} to {project_name}."
)
classes_response = self.controller.annotation_classes.create_multiple(
new_project, project.classes
)
classes_response.raise_for_status()
project.classes = classes_response.data
if copy_workflow:
if not copy_annotation_classes:
logger.info(
f"Skipping the workflow clone from {from_project} to {project_name}."
)
else:
logger.info(f"Cloning workflow from {from_project} to {project_name}.")
workflow_response = self.controller.projects.set_workflows(
new_project, project.workflows
)
workflow_response.raise_for_status()
project.workflows = self.controller.projects.list_workflow(project).data
response = self.controller.projects.get_metadata(
new_project,
include_settings=copy_settings,
include_workflow=copy_workflow,
include_contributors=copy_contributors,
include_annotation_classes=copy_annotation_classes,
include_complete_image_count=True,
)
if response.errors:
raise AppException(response.errors)
return ProjectSerializer(response.data).serialize()
def create_folder(self, project: NotEmptyStr, folder_name: NotEmptyStr):
"""Create a new folder in the project.
:param project: project name
:type project: str
:param folder_name: the new folder's name
:type folder_name: str
:return: dict object metadata the new folder
:rtype: dict
"""
project = self.controller.get_project(project)
folder = project.create_folder(folder_name)
return folder.dict(exclude={"completedCount", "is_root"})
# if res.data:
# folder = res.data
# logger.info(f"Folder {folder.name} created in project {project.name}")
# return FolderSerializer(folder).serialize(
# exclude={"completedCount", "is_root"}
# )
# if res.errors:
# raise AppException(res.errors)
def delete_project(self, project: Union[NotEmptyStr, dict]):
"""Deletes the project
:param project: project name
:type project: str
"""
name = project
if isinstance(project, dict):
name = project["name"]
self.controller.projects.delete(name=name)
def rename_project(self, project: NotEmptyStr, new_name: NotEmptyStr):
"""Renames the project
:param project: project name
:type project: str
:param new_name: project's new name
:type new_name: str
"""
old_name = project
project = self.controller.get_project(old_name) # noqa
project.name = new_name
response = self.controller.projects.update(project)
if response.errors:
raise AppException(response.errors)
logger.info(
"Successfully renamed project %s to %s.", old_name, response.data.name
)
return ProjectSerializer(response.data).serialize()
def get_folder_metadata(self, project: NotEmptyStr, folder_name: NotEmptyStr):
"""Returns folder metadata
:param project: project name
:type project: str
:param folder_name: folder's name
:type folder_name: str
:return: metadata of folder
:rtype: dict
"""
project, folder = self.controller.get_project_folder(project, folder_name)
if not folder:
raise AppException("Folder not found.")
return folder.dict(exclude={"completedCount", "is_root"})
def delete_folders(self, project: NotEmptyStr, folder_names: List[NotEmptyStr]):
"""Delete folder in project.
:param project: project name
:type project: str
:param folder_names: to be deleted folders' names
:type folder_names: list of strs
"""
project = self.controller.get_project(project)
deleted_folders = project.delete_folders(folder_names)
if deleted_folders == 0:
raise AppException("There is no folder to delete.")
logger.info(f"Folders {folder_names} deleted in project {project.name}")
def search_folders(
self,
project: NotEmptyStr,
folder_name: Optional[NotEmptyStr] = None,
status: Optional[Union[FOLDER_STATUS, List[FOLDER_STATUS]]] = None,
return_metadata: Optional[bool] = False,
):
"""Folder name based case-insensitive search for folders in project.
:param project: project name
:type project: str
:param folder_name: the new folder's name
:type folder_name: str. If None, all the folders in the project will be returned.
:param status: search folders via status. If None, all folders will be returned. \n
Available statuses are::
* NotStarted
* InProgress
* Completed
* OnHold
:type status: str or list of str
:param return_metadata: return metadata of folders instead of names
:type return_metadata: bool
:return: folder names or metadatas
:rtype: list of strs or dicts
"""
project = self.controller.get_project(project)
condition = EmptyCondition()
if folder_name:
condition &= Condition("name", folder_name, EQ)
if return_metadata:
condition &= Condition("includeUsers", return_metadata, EQ)
if status:
condition &= Condition(
"status", constants.FolderStatus.get_value(status), EQ
)
folders = project.list_folders(condition)
if return_metadata:
return [
folder.dict(exclude={"completedCount", "is_root"})
for folder in folders
if not folder.is_root
]
return [folder.name for folder in folders if not folder.is_root]
def get_project_metadata(
self,
project: Union[NotEmptyStr, dict],
include_annotation_classes: Optional[bool] = False,
include_settings: Optional[bool] = False,
include_workflow: Optional[bool] = False,
include_contributors: Optional[bool] = False,
include_complete_item_count: Optional[bool] = False,
):
"""Returns project metadata
:param project: project name
:type project: str
:param include_annotation_classes: enables project annotation classes output under
the key "annotation_classes"
:type include_annotation_classes: bool
:param include_settings: enables project settings output under
the key "settings"
:type include_settings: bool
:param include_workflow: enables project workflow output under
the key "workflow"
:type include_workflow: bool
:param include_contributors: enables project contributors output under
the key "contributors"
:type include_contributors: bool
:param include_complete_item_count: enables project complete item count output under
the key "completed_items_count"
:type include_complete_item_count: bool
:return: metadata of project
:rtype: dict
"""
project_name, folder_name = extract_project_folder(project)
project = self.controller.get_project(project_name)
response = self.controller.projects.get_metadata(
project,
include_annotation_classes,
include_settings,
include_workflow,
include_contributors,
include_complete_item_count,
)
if response.errors:
raise AppException(response.errors)
return ProjectSerializer(response.data).serialize()
def get_project_settings(self, project: Union[NotEmptyStr, dict]):
"""Gets project's settings.
Return value example: [{ "attribute" : "Brightness", "value" : 10, ...},...]
:param project: project name or metadata
:type project: str or dict
:return: project settings
:rtype: list of dicts
"""
project_name, _ = extract_project_folder(project)
project = self.controller.projects.get_by_name(project_name).data
settings = self.controller.projects.list_settings(project).data
settings = [
SettingsSerializer(attribute.dict()).serialize() for attribute in settings
]
return settings
def get_project_workflow(self, project: Union[str, dict]):
"""Gets project's workflow.
Return value example: [{ "step" : <step_num>, "className" : <annotation_class>, "tool" : <tool_num>, ...},...]
:param project: project name or metadata
:type project: str or dict
:return: project workflow
:rtype: list of dicts
"""
project_name, folder_name = extract_project_folder(project)
project = self.controller.get_project(project_name)
workflow = self.controller.projects.list_workflow(project)
if workflow.errors:
raise AppException(workflow.errors)
return workflow.data
def search_annotation_classes(
self,
project: Union[NotEmptyStr, dict, Project],
name_contains: Optional[str] = None,
):
"""Searches annotation classes by name_prefix (case-insensitive)
:param project: project name
:type project: str
:param name_contains: search string. Returns those classes,
where the given string is found anywhere within its name. If None, all annotation classes will be returned.
:type name_contains: str
:return: annotation classes of the project
:rtype: list of dicts
"""
if isinstance(project, Project):
project = project.dict()
project_name, _ = extract_project_folder(project)
project = self.controller.get_project(project_name)
condition = (
Condition("name", name_contains, EQ) & Condition("pattern", True, EQ)
if name_contains
else None
)
return [
annotation_class.dict()
for annotation_class in project.list_annotation_classes(condition)
]
def set_project_status(self, project: NotEmptyStr, status: PROJECT_STATUS):
"""Set project status
:param project: project name
:type project: str
:param status: status to set.
Available statuses are::
* NotStarted
* InProgress
* Returned
* Completed
* OnHold
:type status: str
"""
project = self.controller.get_project(pk=project)
project.status = constants.ProjectStatus.get_value(status)
response = self.controller.projects.update(project)
if response.errors:
raise AppException(f"Failed to change {project.name} status.")
logger.info(f"Successfully updated {project.name} status to {status}")
def set_folder_status(
self, project: NotEmptyStr, folder: NotEmptyStr, status: FOLDER_STATUS
):
"""Set folder status
:param project: project name
:type project: str
:param folder: folder name
:type folder: str
:param status: status to set. \n
Available statuses are::
* NotStarted
* InProgress
* Completed
* OnHold
:type status: str
"""
project = self.controller.get_project(project)
try:
folder = project.set_folder_status(folder, FolderStatus[status])
except SAException as e:
raise AppException(e.message)
except Exception:
raise AppException(f"Failed to change {project.name}/{folder} status.")
logger.info(
f"Successfully updated {project.name}/{folder.name} status to {status}"
)
def set_project_default_image_quality_in_editor(
self,
project: Union[NotEmptyStr, dict],
image_quality_in_editor: Optional[str],
):
"""Sets project's default image quality in editor setting.
:param project: project name or metadata
:type project: str or dict
:param image_quality_in_editor: new setting value, should be "original" or "compressed"
:type image_quality_in_editor: str
"""
project_name, folder_name = extract_project_folder(project)
image_quality_in_editor = ImageQuality.get_value(image_quality_in_editor)
project = self.controller.get_project(project_name)
response = self.controller.projects.set_settings(
project=project,
settings=[{"attribute": "ImageQuality", "value": image_quality_in_editor}],
)
if response.errors:
raise AppException(response.errors)
return response.data
def pin_image(
self,
project: Union[NotEmptyStr, dict],
image_name: str,
pin: Optional[bool] = True,
):
"""Pins (or unpins) image
:param project: project name or folder path (e.g., "project1/folder1")
:type project: str
:param image_name: image name
:type image_name: str
:param pin: sets to pin if True, else unpins image
:type pin: bool
"""
project, folder = self.controller.get_project_folder_by_path(project)
response = self.controller.items.get_by_name(project, folder, image_name)
if response.errors:
raise AppException(response.errors)
item = response.data
item.is_pinned = int(pin)
self.controller.items.update(project=project, item=item)
def delete_items(self, project: str, items: Optional[List[str]] = None):
"""Delete items in a given project.
:param project: project name or folder path (e.g., "project1/folder1")
:type project: str
:param items: to be deleted items' names. If None, all the items will be deleted
:type items: list of str
"""
project_name, folder_name = extract_project_folder(project)
project = self.controller.get_project(project_name)
folder = project.get_folder(folder_name)
folder.delete_items(item_names=items)
def assign_items(
self, project: Union[NotEmptyStr, dict], items: List[str], user: str
):
"""Assigns items to a user. The assignment role, QA or Annotator, will
be deduced from the user's role in the project. The type of the objects` image, video or text
will be deduced from the project type. With SDK, the user can be
assigned to a role in the project with the share_project function.
:param project: project name or folder path (e.g., "project1/folder1")
:type project: str
:param items: list of items to assign
:type items: list of str
:param user: user email
:type user: str
"""
project_name, folder_name = extract_project_folder(project)
project = self.controller.get_project(project_name)
folder = project.get_folder(folder_name)
folder.assign_items(user=user, item_names=items)
def unassign_items(
self, project: Union[NotEmptyStr, dict], items: List[NotEmptyStr]
):
"""Removes assignment of given items for all assignees. With SDK,
the user can be assigned to a role in the project with the share_project
function.
:param project: project name or folder path (e.g., "project1/folder1")
:type project: str
:param items: list of items to unassign
:type items: list of str
"""
project_name, folder_name = extract_project_folder(project)
project = self.controller.get_project(project_name)
folder = project.get_folder(folder_name)
folder.assign_items(item_names=items)
def unassign_folder(self, project_name: NotEmptyStr, folder_name: NotEmptyStr):
"""Removes assignment of given folder for all assignees.
With SDK, the user can be assigned to a role in the project
with the share_project function.
:param project_name: project name
:type project_name: str
:param folder_name: folder name to remove assignees
:type folder_name: str
"""
project = self.controller.get_project(project_name)
folder = project.get_folder(folder_name)
folder.unassign()
def assign_folder(
self,
project_name: NotEmptyStr,
folder_name: NotEmptyStr,
users: List[NotEmptyStr],
):
"""Assigns folder to users. With SDK, the user can be
assigned to a role in the project with the share_project function.
:param project_name: project name or metadata of the project
:type project_name: str or dict
:param folder_name: folder name to assign
:type folder_name: str
:param users: list of user emails
:type users: list of str
"""
# todo check to delete the function
project = self.controller.get_project(project_name)
project.get_folder(folder_name).assign(users=users)
def upload_images_from_folder_to_project(
self,
project: Union[NotEmptyStr, dict],
folder_path: Union[NotEmptyStr, Path],
extensions: Optional[
Union[List[NotEmptyStr], Tuple[NotEmptyStr]]
] = constants.DEFAULT_IMAGE_EXTENSIONS,
annotation_status="NotStarted",
from_s3_bucket=None,
exclude_file_patterns: Optional[
Iterable[NotEmptyStr]
] = constants.DEFAULT_FILE_EXCLUDE_PATTERNS,
recursive_subfolders: Optional[bool] = False,
image_quality_in_editor: Optional[str] = None,
):
"""Uploads all images with given extensions from folder_path to the project.
Sets status of all the uploaded images to set_status if it is not None.
If an image with existing name already exists in the project it won't be uploaded,
and its path will be appended to the third member of return value of this
function.
:param project: project name or folder path (e.g., "project1/folder1")
:type project: str or dict
:param folder_path: from which folder to upload the images
:type folder_path: Path-like (str or Path)
:param extensions: tuple or list of filename extensions to include from folder
:type extensions: tuple or list of strs
:param annotation_status: value to set the annotation statuses of the uploaded images
NotStarted InProgress QualityCheck Returned Completed Skipped
:type annotation_status: str
:param from_s3_bucket: AWS S3 bucket to use. If None then folder_path is in local filesystem
:type from_s3_bucket: str