Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import ftplib
from collections.abc import Sequence
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -60,6 +61,9 @@ class FTPToS3Operator(BaseOperator):
:param gzip: If True, the file will be compressed locally
:param acl_policy: String specifying the canned ACL policy for the file being
uploaded to the S3 bucket.
:param fail_on_file_not_exist: If True, operator fails when a source file does not
exist on the FTP server. If False, the operator logs a warning and skips the
transfer. Default is True.
"""

template_fields: Sequence[str] = ("ftp_path", "s3_bucket", "s3_key", "ftp_filenames", "s3_filenames")
Expand All @@ -78,6 +82,7 @@ def __init__(
encrypt: bool = False,
gzip: bool = False,
acl_policy: str | None = None,
fail_on_file_not_exist: bool = True,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -92,25 +97,31 @@ def __init__(
self.encrypt = encrypt
self.gzip = gzip
self.acl_policy = acl_policy
self.fail_on_file_not_exist = fail_on_file_not_exist
self.s3_hook: S3Hook | None = None
self.ftp_hook: FTPHook | None = None

def __upload_to_s3_from_ftp(self, remote_filename, s3_file_key):
with NamedTemporaryFile() as local_tmp_file:
self.ftp_hook.retrieve_file(
remote_full_path=remote_filename, local_full_path_or_buffer=local_tmp_file.name
)

self.s3_hook.load_file(
filename=local_tmp_file.name,
key=s3_file_key,
bucket_name=self.s3_bucket,
replace=self.replace,
encrypt=self.encrypt,
gzip=self.gzip,
acl_policy=self.acl_policy,
)
self.log.info("File upload to %s", s3_file_key)
try:
with NamedTemporaryFile() as local_tmp_file:
self.ftp_hook.retrieve_file(
remote_full_path=remote_filename, local_full_path_or_buffer=local_tmp_file.name
)
self.s3_hook.load_file(
filename=local_tmp_file.name,
key=s3_file_key,
bucket_name=self.s3_bucket,
replace=self.replace,
encrypt=self.encrypt,
gzip=self.gzip,
acl_policy=self.acl_policy,
)
self.log.info("File upload to %s", s3_file_key)
except ftplib.error_perm as e:
if "550" in str(e) and not self.fail_on_file_not_exist:
self.log.info("File %s not found on FTP server. Skipping transfer.", remote_filename)
return
raise

def execute(self, context: Context):
self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,43 +39,99 @@ class S3ToFTPOperator(BaseOperator):

:param s3_bucket: The targeted s3 bucket. This is the S3 bucket from
where the file is downloaded.
:param s3_key: The targeted s3 key. This is the specified file path for
downloading the file from S3.
:param ftp_path: The ftp remote path. This is the specified file path for
uploading file to the FTP server.
:param s3_key: The targeted s3 key. For a single file it must include the file
path. For multiple files it is the key prefix (directory) and must end with
``"/"``.
:param s3_filenames: Only used if you want to move multiple files. You can pass
a list with exact key suffixes present under the s3_key prefix, or a string
prefix that all filenames must match. Use ``"*"`` to move all objects under
the s3_key prefix.
:param ftp_path: The ftp remote path. For a single file it must include the file
path. For multiple files it is the destination directory path and must end
with ``"/"``.
:param ftp_filenames: Only used if you want to move multiple files and name them
differently at the destination. It can be a list of filenames or a string
prefix that replaces the s3 prefix.
:param aws_conn_id: reference to a specific AWS connection
:param ftp_conn_id: The ftp connection id. The name or identifier for
establishing a connection to the FTP server.
:param fail_on_file_not_exist: If True, operator fails when a source S3 key does not
exist. If False, the operator logs a warning and skips the transfer. Default is True.
"""

template_fields: Sequence[str] = ("s3_bucket", "s3_key", "ftp_path")
template_fields: Sequence[str] = ("s3_bucket", "s3_key", "ftp_path", "s3_filenames", "ftp_filenames")

def __init__(
self,
*,
s3_bucket,
s3_key,
ftp_path,
s3_filenames: str | list[str] | None = None,
ftp_filenames: str | list[str] | None = None,
aws_conn_id="aws_default",
ftp_conn_id="ftp_default",
fail_on_file_not_exist: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.s3_bucket = s3_bucket
self.s3_key = s3_key
self.ftp_path = ftp_path
self.s3_filenames = s3_filenames
self.ftp_filenames = ftp_filenames
self.aws_conn_id = aws_conn_id
self.ftp_conn_id = ftp_conn_id
self.fail_on_file_not_exist = fail_on_file_not_exist

def _download_from_s3(self, s3_hook: S3Hook, ftp_hook: FTPHook, s3_key: str, ftp_path: str) -> None:
if not s3_hook.check_for_key(s3_key, self.s3_bucket):
if self.fail_on_file_not_exist:
raise FileNotFoundError(f"Key {s3_key!r} not found in S3 bucket {self.s3_bucket!r}")
self.log.info("Key %s not found in S3. Skipping transfer.", s3_key)
return
s3_obj = s3_hook.get_key(s3_key, self.s3_bucket)
with NamedTemporaryFile() as local_tmp_file:
self.log.info("Downloading file from %s", s3_key)
s3_obj.download_fileobj(local_tmp_file)
local_tmp_file.seek(0)
ftp_hook.store_file(ftp_path, local_tmp_file.name)
self.log.info("File stored in %s", ftp_path)

def execute(self, context: Context):
s3_hook = S3Hook(self.aws_conn_id)
ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)

s3_obj = s3_hook.get_key(self.s3_key, self.s3_bucket)
if self.s3_filenames:
if isinstance(self.s3_filenames, str):
self.log.info("Getting files in s3://%s/%s", self.s3_bucket, self.s3_key)
all_keys = s3_hook.list_keys(bucket_name=self.s3_bucket, prefix=self.s3_key) or []
filenames = [k[len(self.s3_key) :] for k in all_keys]
if self.s3_filenames == "*":
files = filenames
else:
s3_prefix: str = self.s3_filenames
files = [f for f in filenames if s3_prefix in f]

with NamedTemporaryFile() as local_tmp_file:
self.log.info("Downloading file from %s", self.s3_key)
s3_obj.download_fileobj(local_tmp_file)
local_tmp_file.seek(0)
ftp_hook.store_file(self.ftp_path, local_tmp_file.name)
self.log.info("File stored in %s", {self.ftp_path})
for file in files:
self.log.info("Moving file %s", file)
if self.ftp_filenames and isinstance(self.ftp_filenames, str):
ftp_filename = file.replace(self.s3_filenames, self.ftp_filenames)
else:
ftp_filename = file
self._download_from_s3(
s3_hook, ftp_hook, self.s3_key + file, self.ftp_path + ftp_filename
)
else:
if self.ftp_filenames:
for s3_file, ftp_file in zip(self.s3_filenames, self.ftp_filenames):
self._download_from_s3(
s3_hook, ftp_hook, self.s3_key + s3_file, self.ftp_path + ftp_file
)
else:
for s3_file in self.s3_filenames:
self._download_from_s3(
s3_hook, ftp_hook, self.s3_key + s3_file, self.ftp_path + s3_file
)
else:
self._download_from_s3(s3_hook, ftp_hook, self.s3_key, self.ftp_path)
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from airflow.providers.ssh.hooks.ssh import SSHHook

if TYPE_CHECKING:
import paramiko

from airflow.sdk import Context


Expand All @@ -40,8 +42,9 @@ class S3ToSFTPOperator(BaseOperator):

:param sftp_conn_id: The sftp connection id. The name or identifier for
establishing a connection to the SFTP server.
:param sftp_path: The sftp remote path. This is the specified file path for
uploading file to the SFTP server.
:param sftp_path: The sftp remote path. For a single file it must include the
file path. For multiple files it is the destination directory path and must
end with ``"/"``.
:param sftp_remote_host: The remote host of the SFTP server. Overrides host in
Connection.
:param aws_conn_id: The Airflow connection used for AWS credentials.
Expand All @@ -51,14 +54,24 @@ class S3ToSFTPOperator(BaseOperator):
maintained on each worker node).
:param s3_bucket: The targeted s3 bucket. This is the S3 bucket from
where the file is downloaded.
:param s3_key: The targeted s3 key. This is the specified file path for
downloading the file from S3.
:param s3_key: The targeted s3 key. For a single file it must include the file
path. For multiple files it is the key prefix (directory) and must end with
``"/"``.
:param s3_filenames: Only used if you want to move multiple files. You can pass
a list with exact key suffixes present under the s3_key prefix, or a string
prefix that all filenames must match. Use ``"*"`` to move all objects under
the s3_key prefix.
:param sftp_filenames: Only used if you want to move multiple files and name them
differently at the destination. It can be a list of filenames or a string
prefix that replaces the s3 prefix.
:param confirm: specify if the SFTP operation should be confirmed, defaults to True.
When True, a stat will be performed on the remote file after upload to verify
the file size matches and confirm successful transfer.
:param fail_on_file_not_exist: If True, operator fails when a source S3 key does not
exist. If False, the operator logs a warning and skips the transfer. Default is True.
"""

template_fields: Sequence[str] = ("s3_key", "sftp_path", "s3_bucket")
template_fields: Sequence[str] = ("s3_key", "sftp_path", "s3_bucket", "s3_filenames", "sftp_filenames")

def __init__(
self,
Expand All @@ -69,7 +82,10 @@ def __init__(
sftp_conn_id: str = "ssh_default",
sftp_remote_host: str = "",
aws_conn_id: str | None = "aws_default",
s3_filenames: str | list[str] | None = None,
sftp_filenames: str | list[str] | None = None,
confirm: bool = True,
fail_on_file_not_exist: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -79,24 +95,80 @@ def __init__(
self.s3_key = s3_key
self.sftp_remote_host = sftp_remote_host
self.aws_conn_id = aws_conn_id
self.s3_filenames = s3_filenames
self.sftp_filenames = sftp_filenames
self.confirm = confirm
self.fail_on_file_not_exist = fail_on_file_not_exist

@staticmethod
def get_s3_key(s3_key: str) -> str:
"""Parse the correct format for S3 keys regardless of how the S3 url is passed."""
parsed_s3_key = urlsplit(s3_key)
return parsed_s3_key.path.lstrip("/")

def _download_from_s3(
self,
sftp_client: paramiko.SFTPClient,
s3_hook: S3Hook,
s3_key: str,
sftp_path: str,
) -> None:
if not s3_hook.check_for_key(s3_key, self.s3_bucket):
if self.fail_on_file_not_exist:
raise FileNotFoundError(f"Key {s3_key!r} not found in S3 bucket {self.s3_bucket!r}")
self.log.info("Key %s not found in S3. Skipping transfer.", s3_key)
return
with NamedTemporaryFile("w") as f:
s3_hook.get_conn().download_file(self.s3_bucket, s3_key, f.name)
sftp_client.put(f.name, sftp_path, confirm=self.confirm)

def execute(self, context: Context) -> None:
self.s3_key = self.get_s3_key(self.s3_key)

# SSHHook will handle a None/"" sftp_remote_host
ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id, remote_host=self.sftp_remote_host)
s3_hook = S3Hook(self.aws_conn_id)

s3_client = s3_hook.get_conn()
sftp_client = ssh_hook.get_conn().open_sftp()

with NamedTemporaryFile("w") as f:
s3_client.download_file(self.s3_bucket, self.s3_key, f.name)
sftp_client.put(f.name, self.sftp_path, confirm=self.confirm)
if self.s3_filenames:
if isinstance(self.s3_filenames, str):
self.log.info("Getting files in s3://%s/%s", self.s3_bucket, self.s3_key)
all_keys = s3_hook.list_keys(bucket_name=self.s3_bucket, prefix=self.s3_key) or []
filenames = [k[len(self.s3_key) :] for k in all_keys]
if self.s3_filenames == "*":
files = filenames
else:
s3_prefix: str = self.s3_filenames
files = [f for f in filenames if s3_prefix in f]

for file in files:
self.log.info("Moving file %s", file)
if self.sftp_filenames and isinstance(self.sftp_filenames, str):
sftp_filename = file.replace(self.s3_filenames, self.sftp_filenames)
else:
sftp_filename = file
self._download_from_s3(
sftp_client,
s3_hook,
self.s3_key + file,
self.sftp_path + sftp_filename,
)
else:
if self.sftp_filenames:
for s3_file, sftp_file in zip(self.s3_filenames, self.sftp_filenames):
self._download_from_s3(
sftp_client,
s3_hook,
self.s3_key + s3_file,
self.sftp_path + sftp_file,
)
else:
for s3_file in self.s3_filenames:
self._download_from_s3(
sftp_client,
s3_hook,
self.s3_key + s3_file,
self.sftp_path + s3_file,
)
else:
self._download_from_s3(sftp_client, s3_hook, self.s3_key, self.sftp_path)
Loading
Loading