From 81e598b9c36242450ae6b664c9af76f21b3676c8 Mon Sep 17 00:00:00 2001 From: aahallal Date: Thu, 9 Apr 2026 18:03:38 +0000 Subject: [PATCH] Change CodeDeploy config file permissions to owner only --- .../next-release/bugfix-codedeploy-54753.json | 5 + awscli/customizations/codedeploy/register.py | 112 ++++++------- .../codedeploy/test_register.py | 155 +++++++++++------- 3 files changed, 152 insertions(+), 120 deletions(-) create mode 100644 .changes/next-release/bugfix-codedeploy-54753.json diff --git a/.changes/next-release/bugfix-codedeploy-54753.json b/.changes/next-release/bugfix-codedeploy-54753.json new file mode 100644 index 000000000000..29e521f585ec --- /dev/null +++ b/.changes/next-release/bugfix-codedeploy-54753.json @@ -0,0 +1,5 @@ +{ + "type": "bugfix", + "category": "codedeploy", + "description": "Tighten file permissions for CodeDeploy configuration file" +} diff --git a/awscli/customizations/codedeploy/register.py b/awscli/customizations/codedeploy/register.py index e959044c1cf9..80dfc9f35f83 100644 --- a/awscli/customizations/codedeploy/register.py +++ b/awscli/customizations/codedeploy/register.py @@ -11,13 +11,19 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. +import os import sys -from awscli.customizations.commands import BasicCommand from awscli.customizations.codedeploy.systems import DEFAULT_CONFIG_FILE -from awscli.customizations.codedeploy.utils import \ - validate_region, validate_instance_name, validate_tags, \ - validate_iam_user_arn, INSTANCE_NAME_ARG, IAM_USER_ARN_ARG +from awscli.customizations.codedeploy.utils import ( + IAM_USER_ARN_ARG, + INSTANCE_NAME_ARG, + validate_iam_user_arn, + validate_instance_name, + validate_region, + validate_tags, +) +from awscli.customizations.commands import BasicCommand from awscli.utils import create_nested_client @@ -39,15 +45,15 @@ class Register(BasicCommand): "Key": { "description": "The tag key.", "type": "string", - "required": True + "required": True, }, "Value": { "description": "The tag value.", "type": "string", - "required": True - } - } - } + "required": True, + }, + }, + }, } ARG_TABLE = [ @@ -61,9 +67,9 @@ class Register(BasicCommand): 'help_text': ( 'Optional. The list of key/value pairs to tag the on-premises ' 'instance.' - ) + ), }, - IAM_USER_ARN_ARG + IAM_USER_ARN_ARG, ] def _run_main(self, parsed_args, parsed_globals): @@ -79,12 +85,10 @@ def _run_main(self, parsed_args, parsed_globals): 'codedeploy', region_name=params.region, endpoint_url=parsed_globals.endpoint_url, - verify=parsed_globals.verify_ssl + verify=parsed_globals.verify_ssl, ) self.iam = create_nested_client( - self._session, - 'iam', - region_name=params.region + self._session, 'iam', region_name=params.region ) try: @@ -97,54 +101,41 @@ def _run_main(self, parsed_args, parsed_globals): if params.tags: self._add_tags(params) sys.stdout.write( - 'Copy the on-premises configuration file named {0} to the ' + f'Copy the on-premises configuration file named {DEFAULT_CONFIG_FILE} to the ' 'on-premises instance, and run the following command on the ' 'on-premises instance to install and configure the AWS ' 'CodeDeploy Agent:\n' - 'aws deploy install --config-file {0}\n'.format( - DEFAULT_CONFIG_FILE - ) + f'aws deploy install --config-file {DEFAULT_CONFIG_FILE}\n' ) except Exception as e: sys.stdout.flush() sys.stderr.write( 'ERROR\n' - '{0}\n' + f'{e}\n' 'Register the on-premises instance by following the ' 'instructions in "Configure Existing On-Premises Instances by ' 'Using AWS CodeDeploy" in the AWS CodeDeploy User ' - 'Guide.\n'.format(e) + 'Guide.\n' ) def _create_iam_user(self, params): sys.stdout.write('Creating the IAM user... ') params.user_name = params.instance_name response = self.iam.create_user( - Path='/AWS/CodeDeploy/', - UserName=params.user_name + Path='/AWS/CodeDeploy/', UserName=params.user_name ) params.iam_user_arn = response['User']['Arn'] - sys.stdout.write( - 'DONE\n' - 'IamUserArn: {0}\n'.format( - params.iam_user_arn - ) - ) + sys.stdout.write('DONE\n' f'IamUserArn: {params.iam_user_arn}\n') def _create_access_key(self, params): sys.stdout.write('Creating the IAM user access key... ') - response = self.iam.create_access_key( - UserName=params.user_name - ) + response = self.iam.create_access_key(UserName=params.user_name) params.access_key_id = response['AccessKey']['AccessKeyId'] params.secret_access_key = response['AccessKey']['SecretAccessKey'] sys.stdout.write( 'DONE\n' - 'AccessKeyId: {0}\n' - 'SecretAccessKey: {1}\n'.format( - params.access_key_id, - params.secret_access_key - ) + f'AccessKeyId: {params.access_key_id}\n' + f'SecretAccessKey: {params.secret_access_key}\n' ) def _create_user_policy(self, params): @@ -163,49 +154,50 @@ def _create_user_policy(self, params): self.iam.put_user_policy( UserName=params.user_name, PolicyName=params.policy_name, - PolicyDocument=params.policy_document + PolicyDocument=params.policy_document, ) sys.stdout.write( 'DONE\n' - 'PolicyName: {0}\n' - 'PolicyDocument: {1}\n'.format( - params.policy_name, - params.policy_document - ) + f'PolicyName: {params.policy_name}\n' + f'PolicyDocument: {params.policy_document}\n' ) def _create_config(self, params): sys.stdout.write( - 'Creating the on-premises instance configuration file named {0}' - '...'.format(DEFAULT_CONFIG_FILE) + f'Creating the on-premises instance configuration file named {DEFAULT_CONFIG_FILE}' + '...' ) - with open(DEFAULT_CONFIG_FILE, 'w') as f: - f.write( - '---\n' - 'region: {0}\n' - 'iam_user_arn: {1}\n' - 'aws_access_key_id: {2}\n' - 'aws_secret_access_key: {3}\n'.format( - params.region, - params.iam_user_arn, - params.access_key_id, - params.secret_access_key + try: + fd = os.open( + DEFAULT_CONFIG_FILE, + os.O_WRONLY | os.O_CREAT | os.O_TRUNC, + 0o600, + ) + with os.fdopen(fd, 'w') as f: + os.chmod(DEFAULT_CONFIG_FILE, 0o600) + f.write( + '---\n' + f'region: {params.region}\n' + f'iam_user_arn: {params.iam_user_arn}\n' + f'aws_access_key_id: {params.access_key_id}\n' + f'aws_secret_access_key: {params.secret_access_key}\n' ) + except OSError as e: + raise RuntimeError( + f'Failed to create config file {DEFAULT_CONFIG_FILE}: {e}' ) sys.stdout.write('DONE\n') def _register_instance(self, params): sys.stdout.write('Registering the on-premises instance... ') self.codedeploy.register_on_premises_instance( - instanceName=params.instance_name, - iamUserArn=params.iam_user_arn + instanceName=params.instance_name, iamUserArn=params.iam_user_arn ) sys.stdout.write('DONE\n') def _add_tags(self, params): sys.stdout.write('Adding tags to the on-premises instance... ') self.codedeploy.add_tags_to_on_premises_instances( - tags=params.tags, - instanceNames=[params.instance_name] + tags=params.tags, instanceNames=[params.instance_name] ) sys.stdout.write('DONE\n') diff --git a/tests/unit/customizations/codedeploy/test_register.py b/tests/unit/customizations/codedeploy/test_register.py index 12eac78b0245..a1583857f061 100644 --- a/tests/unit/customizations/codedeploy/test_register.py +++ b/tests/unit/customizations/codedeploy/test_register.py @@ -12,6 +12,7 @@ # language governing permissions and limitations under the License. from argparse import Namespace + from awscli.customizations.codedeploy.register import Register from awscli.customizations.codedeploy.utils import MAX_TAGS_PER_INSTANCE from awscli.testutils import mock, unittest @@ -50,11 +51,21 @@ def setUp(self): self.globals.endpoint_url = self.endpoint_url self.globals.verify_ssl = False - self.open_patcher = mock.patch( - 'awscli.customizations.codedeploy.register.open', - mock.mock_open(), create=True + self.os_open_patcher = mock.patch( + 'awscli.customizations.codedeploy.register.os.open', return_value=3 + ) + self.os_open = self.os_open_patcher.start() + + self.os_fdopen_patcher = mock.patch( + 'awscli.customizations.codedeploy.register.os.fdopen', + mock.mock_open(), ) - self.open = self.open_patcher.start() + self.os_fdopen = self.os_fdopen_patcher.start() + + self.os_chmod_patcher = mock.patch( + 'awscli.customizations.codedeploy.register.os.chmod' + ) + self.os_chmod = self.os_chmod_patcher.start() self.codedeploy = mock.MagicMock() @@ -65,7 +76,7 @@ def setUp(self): self.iam.create_access_key.return_value = { 'AccessKey': { 'AccessKeyId': self.access_key_id, - 'SecretAccessKey': self.secret_access_key + 'SecretAccessKey': self.secret_access_key, } } @@ -74,7 +85,9 @@ def setUp(self): self.register = Register(self.session) def tearDown(self): - self.open_patcher.stop() + self.os_open_patcher.stop() + self.os_fdopen_patcher.stop() + self.os_chmod_patcher.stop() def test_register_throws_on_invalid_region(self): self.globals.region = None @@ -85,7 +98,8 @@ def test_register_throws_on_invalid_region(self): def test_register_throws_on_invalid_instance_name(self): self.args.instance_name = 'invalid%@^&%#&' with self.assertRaisesRegex( - ValueError, 'Instance name contains invalid characters.'): + ValueError, 'Instance name contains invalid characters.' + ): self.register._run_main(self.args, self.globals) def test_register_throws_on_invalid_tags(self): @@ -93,9 +107,9 @@ def test_register_throws_on_invalid_tags(self): {'Key': 'k' + str(x), 'Value': 'v' + str(x)} for x in range(11) ] with self.assertRaisesRegex( - ValueError, - 'Instances can only have a maximum of {0} tags.'.format( - MAX_TAGS_PER_INSTANCE)): + ValueError, + f'Instances can only have a maximum of {MAX_TAGS_PER_INSTANCE} tags.', + ): self.register._run_main(self.args, self.globals) def test_register_throws_on_invalid_iam_user_arn(self): @@ -105,22 +119,23 @@ def test_register_throws_on_invalid_iam_user_arn(self): def test_register_creates_clients(self): self.register._run_main(self.args, self.globals) - self.session.create_client.assert_has_calls([ - mock.call( - 'codedeploy', - region_name=self.region, - endpoint_url=self.endpoint_url, - verify=self.globals.verify_ssl - ), - mock.call('iam', region_name=self.region) - ]) + self.session.create_client.assert_has_calls( + [ + mock.call( + 'codedeploy', + region_name=self.region, + endpoint_url=self.endpoint_url, + verify=self.globals.verify_ssl, + ), + mock.call('iam', region_name=self.region), + ] + ) def test_register_with_no_iam_user_arn(self): self.args.iam_user_arn = None self.register._run_main(self.args, self.globals) self.register.iam.create_user.assert_called_with( - Path=self.path, - UserName=self.instance_name + Path=self.path, UserName=self.instance_name ) self.assertIn('iam_user_arn', self.args) self.assertEqual(self.iam_user_arn, self.args.iam_user_arn) @@ -134,30 +149,28 @@ def test_register_with_no_iam_user_arn(self): self.register.iam.put_user_policy.assert_called_with( UserName=self.instance_name, PolicyName=self.policy_name, - PolicyDocument=self.policy_document + PolicyDocument=self.policy_document, ) self.assertIn('policy_name', self.args) self.assertEqual(self.policy_name, self.args.policy_name) self.assertIn('policy_document', self.args) self.assertEqual(self.policy_document, self.args.policy_document) - self.open.assert_called_with(self.config_file, 'w') - self.open().write.assert_called_with( + self.os_open.assert_called_with( + self.config_file, + mock.ANY, + 0o600, + ) + self.os_fdopen.assert_called_with(3, 'w') + self.os_fdopen().write.assert_called_with( '---\n' - 'region: {0}\n' - 'iam_user_arn: {1}\n' - 'aws_access_key_id: {2}\n' - 'aws_secret_access_key: {3}\n'.format( - self.region, - self.iam_user_arn, - self.access_key_id, - self.secret_access_key - ) - ) - self.register.codedeploy.register_on_premises_instance.\ - assert_called_with( - instanceName=self.instance_name, - iamUserArn=self.iam_user_arn - ) + f'region: {self.region}\n' + f'iam_user_arn: {self.iam_user_arn}\n' + f'aws_access_key_id: {self.access_key_id}\n' + f'aws_secret_access_key: {self.secret_access_key}\n' + ) + self.register.codedeploy.register_on_premises_instance.assert_called_with( + instanceName=self.instance_name, iamUserArn=self.iam_user_arn + ) def test_register_with_iam_user_arn(self): self.args.iam_user_arn = self.iam_user_arn @@ -165,21 +178,17 @@ def test_register_with_iam_user_arn(self): self.assertFalse(self.register.iam.create_user.called) self.assertFalse(self.register.iam.create_access_key.called) self.assertFalse(self.register.iam.put_user_policy.called) - self.assertFalse(self.open.called) - self.register.codedeploy.register_on_premises_instance.\ - assert_called_with( - instanceName=self.instance_name, - iamUserArn=self.iam_user_arn - ) + self.assertFalse(self.os_open.called) + self.register.codedeploy.register_on_premises_instance.assert_called_with( + instanceName=self.instance_name, iamUserArn=self.iam_user_arn + ) def test_register_with_no_tags(self): self.args.tags = None self.register._run_main(self.args, self.globals) - self.register.codedeploy.register_on_premises_instance.\ - assert_called_with( - instanceName=self.instance_name, - iamUserArn=self.iam_user_arn - ) + self.register.codedeploy.register_on_premises_instance.assert_called_with( + instanceName=self.instance_name, iamUserArn=self.iam_user_arn + ) self.assertFalse( self.register.codedeploy.add_tags_to_on_premises_instances.called ) @@ -187,16 +196,42 @@ def test_register_with_no_tags(self): def test_register_with_tags(self): self.args.tags = self.tags self.register._run_main(self.args, self.globals) - self.register.codedeploy.register_on_premises_instance.\ - assert_called_with( - instanceName=self.instance_name, - iamUserArn=self.iam_user_arn - ) - self.register.codedeploy.add_tags_to_on_premises_instances.\ - assert_called_with( - tags=self.tags, - instanceNames=[self.instance_name] - ) + self.register.codedeploy.register_on_premises_instance.assert_called_with( + instanceName=self.instance_name, iamUserArn=self.iam_user_arn + ) + self.register.codedeploy.add_tags_to_on_premises_instances.assert_called_with( + tags=self.tags, instanceNames=[self.instance_name] + ) + + def test_create_config_raises_runtime_error_on_open_failure(self): + self.args.iam_user_arn = None + self.os_open.side_effect = OSError('permission denied') + with self.assertRaisesRegex( + RuntimeError, 'Failed to create config file' + ): + self.register._create_config(self.args) + + def test_create_config_raises_runtime_error_on_chmod_failure(self): + self.args.iam_user_arn = None + self.os_chmod.side_effect = OSError('permission denied') + with self.assertRaisesRegex( + RuntimeError, 'Failed to create config file' + ): + self.register._create_config(self.args) + + def test_create_config_uses_restricted_permissions(self): + self.args.iam_user_arn = None + self.register._run_main(self.args, self.globals) + self.os_open.assert_called_with( + self.config_file, + mock.ANY, + 0o600, + ) + + def test_create_config_chmods_existing_file(self): + self.args.iam_user_arn = None + self.register._run_main(self.args, self.globals) + self.os_chmod.assert_called_with(self.config_file, 0o600) if __name__ == "__main__":