Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 37 additions & 50 deletions cterasdk/cio/core/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,54 +40,56 @@ def _extract_rc_msg(result):
return getattr(result, 'rc', None), getattr(result, 'msg', None)


_STRICT_PERMISSION_ERROR_SET = {
(None, None),
(None, ''),
(0, None),
('0', None),
(None, 'permission denied'),
(None, 'access denied'),
(None, 'read only'),
(None, 'action is not allowed'),
('permissiondenied', None),
(None, 'permissiondenied'),
('permissiondenied', 'permissiondenied'),
_STRICT_PERMISSION_DENIED_MESSAGES = {
'permission denied',
'access denied',
'read only',
'action is not allowed',
}

_STRICT_PERMISSION_TASK_ERROR_SET = {
(None, None, 'permissiondenied'),
(None, None, 'permission denied'),
(None, None, 'access denied'),
(None, None, 'read only'),
(None, None, 'action is not allowed'),
(None, 'permission denied', None),
(None, 'access denied', None),
(None, 'read only', None),
(None, 'action is not allowed', None),
(0, None, 'permissiondenied'),
('0', None, 'permissiondenied'),
}
# Some APIs return condensed "permissiondenied" in rc/msg/error_type.
_STRICT_PERMISSION_DENIED_TOKENS = {'permissiondenied'}
_STRICT_PERMISSION_AMBIGUOUS_RC = {None, 0, '0'}
_STRICT_PERMISSION_EMPTY_MESSAGES = {None, ''}


def _raise_strict_permission_denied(result, path):
rc, msg = _extract_rc_msg(result)
rc = _normalize_rc(rc)
msg = _normalize_msg(msg)
rc, msg = _extract_normalized_rc_msg(result)
logger.info(
'strict_permission response for %s: rc=%r msg=%r raw=%s',
path, rc, msg, type(result).__name__
)
if (rc, msg) in _STRICT_PERMISSION_ERROR_SET:
if _is_strict_permission_denied_response(rc, msg):
raise exceptions.io.core.PrivilegeError(path)


def _extract_task_error_tuple(result):
def _extract_normalized_rc_msg(result):
rc, msg = _extract_rc_msg(result)
return _normalize_rc(rc), _normalize_msg(msg)


def _extract_task_error_fields(result):
rc = _normalize_rc(getattr(result, 'rc', None))
msg = _normalize_msg(getattr(result, 'msg', None))
error_type = _normalize_msg(getattr(result, 'error_type', None))
return rc, msg, error_type


def _is_strict_permission_denied_response(rc, msg):
if msg in _STRICT_PERMISSION_DENIED_MESSAGES:
return True
if rc in _STRICT_PERMISSION_DENIED_TOKENS or msg in _STRICT_PERMISSION_DENIED_TOKENS:
return True
return rc in _STRICT_PERMISSION_AMBIGUOUS_RC and msg in _STRICT_PERMISSION_EMPTY_MESSAGES


def _is_strict_permission_denied_task(rc, msg, error_type):
_ = rc
if error_type in _STRICT_PERMISSION_DENIED_MESSAGES or error_type in _STRICT_PERMISSION_DENIED_TOKENS:
return True
return msg in _STRICT_PERMISSION_DENIED_MESSAGES


def split_file_directory(listdir, receiver, destination):
"""
Split a path into its parent directory and final component.
Expand Down Expand Up @@ -685,17 +687,7 @@ def _before_command(self):
def _parents_generator(self):
if self.parents:
parts = self.path.parts
start_index = 1
known_roots = ('My Files', 'Shared With Me', 'Shared', 'Team Portal')
if parts:
if parts[0] in known_roots:
start_index = 2
elif parts[0] in ('Users', 'Groups') and len(parts) > 1:
if len(parts) > 2 and parts[2] in known_roots:
start_index = 4
else:
start_index = 3
for i in range(start_index, len(parts)):
for i in range(1, len(parts)):
yield automatic_resolution('/'.join(parts[:i]), self._receiver.context)
else:
yield self.path
Expand All @@ -707,8 +699,7 @@ def _execute(self):
CreateDirectory(
self._function,
self._receiver,
path,
strict_permission=self._strict_permission
path
).execute()
except exceptions.io.core.CreateDirectoryError as e:
CreateDirectory._suppress_file_conflict_error(e)
Expand All @@ -722,8 +713,7 @@ async def _a_execute(self):
await CreateDirectory(
self._function,
self._receiver,
path,
strict_permission=self._strict_permission
path
).a_execute()
except exceptions.io.core.CreateDirectoryError as e:
CreateDirectory._suppress_file_conflict_error(e)
Expand Down Expand Up @@ -1034,18 +1024,15 @@ async def _a_execute(self):

def _handle_response(self, r):
if self._strict_permission:
rc, msg, error_type = _extract_task_error_tuple(r)
if (rc, msg, error_type) in _STRICT_PERMISSION_TASK_ERROR_SET:
rc, msg, error_type = _extract_task_error_fields(r)
if _is_strict_permission_denied_task(rc, msg, error_type):
raise exceptions.io.core.PrivilegeError('')
if not self.block:
return r

if r.completed:
return self._task_complete(r)

if r.completed_with_warnings:
return r

if r.failed or r.completed_with_warnings:
return self._task_error(r)

Expand Down
13 changes: 13 additions & 0 deletions tests/ut/core/user/test_mkdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,16 @@ def test_mkdir_strict_permission_denied(self):
self._init_services(execute_response=execute_response)
with self.assertRaises(exceptions.io.core.PrivilegeError):
self._services.files.mkdir(self._directory, strict_permission=True)

def test_mkdir_strict_permission_denied_message(self):
execute_response = Object(msg='access denied')
self._init_services(execute_response=execute_response)
with self.assertRaises(exceptions.io.core.PrivilegeError):
self._services.files.mkdir(self._directory, strict_permission=True)

def test_makedirs_strict_permission_root_path(self):
rooted_directories = 'Team Portal/Engineering/Documents'
self._init_services(execute_response=None)
ret = self._services.files.makedirs(rooted_directories, strict_permission=True)
self.assertEqual(self._services.api.execute.call_count, len(rooted_directories.split('/')))
self.assertEqual(ret, rooted_directories)
23 changes: 23 additions & 0 deletions tests/ut/core/user/test_move.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,29 @@ def test_move_strict_permission_denied(self):
self._services.files.move(self._source, destination=self._dest, strict_permission=True)
self._services.tasks.wait.assert_called_once_with(self._task_reference)

def test_move_strict_permission_denied_error_type(self):
task = Object(error_type='permissiondenied')
self._services.tasks.wait = mock.MagicMock(return_value=task)
self._init_services(execute_response=self._task_reference)
with self.assertRaises(exceptions.io.core.PrivilegeError):
self._services.files.move(self._source, destination=self._dest, strict_permission=True)
self._services.tasks.wait.assert_called_once_with(self._task_reference)

def test_move_completed_with_warnings_raises_move_error(self):
task = mock.MagicMock()
task.completed = False
task.completed_with_warnings = True
task.failed = False
task.cursor = None
task.error_type = None
task.unknown_object.return_value = True
task.progress_str = None
self._services.tasks.wait = mock.MagicMock(return_value=task)
self._init_services(execute_response=self._task_reference)
with self.assertRaises(exceptions.io.core.MoveError):
self._services.files.move(self._source, destination=self._dest, strict_permission=True)
self._services.tasks.wait.assert_called_once_with(self._task_reference)

def _create_move_resource_param(self):
destinations = [base_user.BaseCoreServicesTest.encode_path(self._dest + '/' + self._filename)]
return self._create_action_resource_param([base_user.BaseCoreServicesTest.encode_path(self._source)], destinations)