Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 44 additions & 23 deletions test/integration/test_cd_tekton_pipeline_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from ibm_cloud_sdk_core import *
import os
import time
import pytest
from environs import Env
from ibm_continuous_delivery.cd_toolchain_v2 import *
Expand All @@ -37,6 +38,7 @@
trigger_prop_name_link = None
pipeline_run_id_link = None
rerun_id_link = None
run_log_id_link = None
env = Env()


Expand Down Expand Up @@ -580,29 +582,48 @@ def test_list_tekton_pipeline_runs_with_pager(self):
f"\n list_tekton_pipeline_runs() returned a total of {len(all_results)} items(s) using TektonPipelineRunsPager."
)

# @needscredentials
# def test_get_tekton_pipeline_run_logs(self):
# global run_log_id_link
# response = self.cd_pipeline_service.get_tekton_pipeline_run_logs(
# pipeline_id=pipeline_tool_id_link,
# id=pipeline_run_id_link
# )
# assert response.get_status_code() == 200
# logs_collection = response.get_result()
# assert logs_collection is not None
# print(f'\n logs_collection: {logs_collection}')
# print(f'\n len(logs_collection): {len(logs_collection)}')

# @needscredentials
# def test_get_tekton_pipeline_run_log_content(self):
# response = self.cd_pipeline_service.get_tekton_pipeline_run_log_content(
# pipeline_id=pipeline_tool_id_link,
# pipeline_run_id='bf4b3abd-0c93-416b-911e-9cf42f1a1085',
# id=pipeline_tool_id_link
# )
# assert response.get_status_code() == 200
# step_log = response.get_result()
# assert step_log is not None
@needscredentials
def test_get_tekton_pipeline_run_logs(self):
global run_log_id_link
response = self.cd_pipeline_service.get_tekton_pipeline_run_logs(
pipeline_id=pipeline_tool_id_link, id=pipeline_run_id_link
)
assert response.get_status_code() == 200
logs_collection = response.get_result()
assert logs_collection is not None

# Store the first log ID for use in test_get_tekton_pipeline_run_log_content
if logs_collection.get('logs') and len(logs_collection['logs']) > 0:
run_log_id_link = logs_collection['logs'][0]['id']
assert run_log_id_link is not None

@needscredentials
def test_get_tekton_pipeline_run_log_content(self):
# Retry logic with exponential backoff to wait for log content to be available
response = None
max_retries = 10
base_delay = 1
max_delay = 30

for i in range(max_retries):
try:
response = self.cd_pipeline_service.get_tekton_pipeline_run_log_content(
pipeline_id=pipeline_tool_id_link, pipeline_run_id=pipeline_run_id_link, id=run_log_id_link
)
print(f"\nget_tekton_pipeline_run_log_content() was called successfully on attempt {i + 1}.")
break
except Exception as err:
if i == max_retries - 1:
raise
retry_delay = min(base_delay * (2**i), max_delay)
print(
f"\nAttempt {i + 1} calling get_tekton_pipeline_run_log_content() failed, retrying in {retry_delay}s... Error: {err}"
)
time.sleep(retry_delay)

assert response.get_status_code() == 200
step_log = response.get_result()
assert step_log is not None

@needscredentials
def test_delete_tekton_pipeline_run(self):
Expand Down
65 changes: 51 additions & 14 deletions test/integration/test_cd_toolchain_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ibm_cloud_sdk_core import *
import os
from datetime import datetime
import time
import pytest
from environs import Env
from ibm_continuous_delivery.cd_toolchain_v2 import *
Expand Down Expand Up @@ -198,13 +199,31 @@ def test_create_toolchain_event_application_json(self):
'application_json': toolchain_event_prototype_data_application_json_model,
}

response = self.cd_toolchain_service.create_toolchain_event(
toolchain_id=toolchain_id_link,
title='My-custom-event',
description='This is my custom event',
content_type='application/json',
data=toolchain_event_prototype_data_model,
)
# Retry logic with exponential backoff to wait for event creation to succeed
response = None
max_retries = 10
base_delay = 1
max_delay = 30

for i in range(max_retries):
try:
response = self.cd_toolchain_service.create_toolchain_event(
toolchain_id=toolchain_id_link,
title='My-custom-event',
description='This is my custom event',
content_type='application/json',
data=toolchain_event_prototype_data_model,
)
print(f"\ncreate_toolchain_event() was called successfully on attempt {i + 1}.")
break
except Exception as err:
if i == max_retries - 1:
raise
retry_delay = min(base_delay * (2**i), max_delay)
print(
f"\nAttempt {i + 1} calling create_toolchain_event() failed, retrying in {retry_delay}s... Error: {err}"
)
time.sleep(retry_delay)

assert response.get_status_code() == 200
toolchain_event_post = response.get_result()
Expand All @@ -221,13 +240,31 @@ def test_create_toolchain_event_text_plain(self):
'text_plain': toolchain_event_prototype_data_text_plain_model,
}

response = self.cd_toolchain_service.create_toolchain_event(
toolchain_id=toolchain_id_link,
title='My-custom-event',
description='This is my custom event',
content_type='text/plain',
data=toolchain_event_prototype_data_model,
)
# Retry logic with exponential backoff to wait for event creation to succeed
response = None
max_retries = 10
base_delay = 1
max_delay = 30

for i in range(max_retries):
try:
response = self.cd_toolchain_service.create_toolchain_event(
toolchain_id=toolchain_id_link,
title='My-custom-event',
description='This is my custom event',
content_type='text/plain',
data=toolchain_event_prototype_data_model,
)
print(f"\ncreate_toolchain_event() was called successfully on attempt {i + 1}.")
break
except Exception as err:
if i == max_retries - 1:
raise
retry_delay = min(base_delay * (2**i), max_delay)
print(
f"\nAttempt {i + 1} calling create_toolchain_event() failed, retrying in {retry_delay}s... Error: {err}"
)
time.sleep(retry_delay)

assert response.get_status_code() == 200
toolchain_event_post = response.get_result()
Expand Down