Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 39 additions & 21 deletions backend/backend/application/sample_project/sample_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime

import psycopg2
from psycopg2 import sql
from django.conf import settings
from django.core.files.base import File
from jinja2 import Environment, FileSystemLoader
Expand Down Expand Up @@ -153,7 +154,7 @@ def postgres_connection(self):
self._postgres_connection.autocommit = True
return self._postgres_connection

def execute_sql_queries(self, statements: list[str]):
def execute_sql_queries(self, statements: "list[sql.Composable]"):
"""This method is used to execute the sql queries."""
try:
cursor = self.postgres_connection.cursor()
Expand Down Expand Up @@ -185,21 +186,29 @@ def _grant_schema_permissions_on_new_db(self):
cursor = new_db_connection.cursor()

schemas = ["raw", "dev", "stg", "prod"]
user_ident = sql.Identifier(self.user_name)
for schema in schemas:
cursor.execute(f"GRANT USAGE ON SCHEMA {schema} TO {self.user_name};")
schema_ident = sql.Identifier(schema)
cursor.execute(sql.SQL("GRANT USAGE ON SCHEMA {} TO {};").format(schema_ident, user_ident))
cursor.execute(
f"GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA {schema} TO {self.user_name};"
sql.SQL("GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA {} TO {};").format(
schema_ident, user_ident
)
)
cursor.execute(
f"ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} "
f"GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO {self.user_name};"
sql.SQL("ALTER DEFAULT PRIVILEGES IN SCHEMA {} "
"GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO {};").format(
schema_ident, user_ident
)
)
# Transfer schema ownership for non-source schemas so the project user
# can DROP and recreate tables during transformation runs.
# Schema owners can drop any object within their schema in PostgreSQL.
# raw schema stays read-only (source data from template).
for schema in ["dev", "stg", "prod"]:
cursor.execute(f"ALTER SCHEMA {schema} OWNER TO {self.user_name};")
cursor.execute(
sql.SQL("ALTER SCHEMA {} OWNER TO {};").format(sql.Identifier(schema), user_ident)
)

cursor.close()
logging.info(f"Schema permissions granted on database {self.database_name}")
Expand Down Expand Up @@ -286,14 +295,15 @@ def clear_existing_db(self):
cd.delete()

# Clearing existing users and databases
terminate_session = (
f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity " f"WHERE datname = '{self.database_name}';"
)
drop_database = f"drop database if exists {self.database_name};"
drop_user = f"drop user if exists {self.user_name};"
db_ident = sql.Identifier(self.database_name)
user_ident = sql.Identifier(self.user_name)
terminate_session = sql.SQL(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = {};"
).format(sql.Literal(self.database_name))
drop_database = sql.SQL("DROP DATABASE IF EXISTS {};").format(db_ident)
drop_user = sql.SQL("DROP USER IF EXISTS {};").format(user_ident)
sql_statements = [
terminate_session,
# revoke_privilege,
drop_database,
drop_user,
]
Expand All @@ -302,17 +312,22 @@ def clear_existing_db(self):

def create_new_database(self):
"""This method is used to create a new database."""
create_db_query = f"CREATE DATABASE {self.database_name};"
user_check_query = f"SELECT 1 FROM pg_roles WHERE rolname = {self.user_name}"
create_user_query = f"CREATE USER {self.user_name} WITH ENCRYPTED PASSWORD '{self.password}';"
grant_role_query = f"GRANT ALL PRIVILEGES ON DATABASE {self.database_name} TO {self.user_name};"
db_ident = sql.Identifier(self.database_name)
user_ident = sql.Identifier(self.user_name)
create_db_query = sql.SQL("CREATE DATABASE {};").format(db_ident)
create_user_query = sql.SQL("CREATE USER {} WITH ENCRYPTED PASSWORD {};").format(
user_ident, sql.Literal(self.password)
)
grant_role_query = sql.SQL("GRANT ALL PRIVILEGES ON DATABASE {} TO {};").format(db_ident, user_ident)
statements = [create_db_query]
if not self.user_exist():
statements.append(create_user_query)
statements.append(grant_role_query)
if self._clone_db:
logging.info(f"creating(cloning) new sample db with the name - {self.database_name}")
create_template_db_query = f"CREATE DATABASE {self.database_name} TEMPLATE {self.master_db_name};"
create_template_db_query = sql.SQL("CREATE DATABASE {} TEMPLATE {};").format(
db_ident, sql.Identifier(self.master_db_name)
)
statements[0] = create_template_db_query
try:
self.execute_sql_queries(statements=statements)
Expand Down Expand Up @@ -347,9 +362,11 @@ def user_exist(self) -> bool:
def grant_permissions(self):
"""This method is used to grant the permissions to the user and
database."""
user_ident = sql.Identifier(self.user_name)
db_ident = sql.Identifier(self.database_name)
statements = [
f"CREATE USER {self.user_name} WITH ENCRYPTED PASSWORD '{self.password}';",
f"GRANT ALL PRIVILEGES ON DATABASE {self.database_name} TO {self.user_name};",
sql.SQL("CREATE USER {} WITH ENCRYPTED PASSWORD {};").format(user_ident, sql.Literal(self.password)),
sql.SQL("GRANT ALL PRIVILEGES ON DATABASE {} TO {};").format(db_ident, user_ident),
]
self.execute_sql_queries(statements=statements)
logging.info(f"new sample db and user created with the name - {self.user_name} database - {self.database_name}")
Expand Down Expand Up @@ -410,8 +427,9 @@ def create_schemas(self):
# Create schemas using raw SQL
schemas = ["raw", "dev", "stg", "prod"]
for schema in schemas:
statements.append(f"DROP SCHEMA IF EXISTS {schema} CASCADE;")
statements.append(f"CREATE SCHEMA IF NOT EXISTS {schema};")
schema_ident = sql.Identifier(schema)
statements.append(sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE;").format(schema_ident))
statements.append(sql.SQL("CREATE SCHEMA IF NOT EXISTS {};").format(schema_ident))
self.execute_sql_queries(statements=statements)
logging.info("schemas created successfully")
except Exception as e:
Expand Down
Loading