diff --git a/src/pathsim_chem/__init__.py b/src/pathsim_chem/__init__.py index a879933..39dc189 100644 --- a/src/pathsim_chem/__init__.py +++ b/src/pathsim_chem/__init__.py @@ -15,3 +15,4 @@ #for direct block import from main package from .tritium import * from .thermodynamics import * +from .process import * diff --git a/src/pathsim_chem/process/__init__.py b/src/pathsim_chem/process/__init__.py new file mode 100644 index 0000000..daf192b --- /dev/null +++ b/src/pathsim_chem/process/__init__.py @@ -0,0 +1,10 @@ +######################################################################################### +## +## Dynamic Unit Operation Models for Chemical Processes +## +######################################################################################### + +from .cstr import * +from .heat_exchanger import * +from .flash_drum import * +from .distillation import * diff --git a/src/pathsim_chem/process/cstr.py b/src/pathsim_chem/process/cstr.py new file mode 100644 index 0000000..6ce6672 --- /dev/null +++ b/src/pathsim_chem/process/cstr.py @@ -0,0 +1,155 @@ +######################################################################################### +## +## Continuous Stirred-Tank Reactor (CSTR) Block +## +######################################################################################### + +# IMPORTS =============================================================================== + +import numpy as np + +from pathsim.blocks.dynsys import DynamicalSystem + +# CONSTANTS ============================================================================= + +R_GAS = 8.314 # universal gas constant [J/(mol·K)] + +# BLOCKS ================================================================================ + +class CSTR(DynamicalSystem): + """Continuous stirred-tank reactor with Arrhenius kinetics and energy balance. + + Models a well-mixed tank where a single reaction A -> products occurs with + nth-order kinetics. The reaction rate follows the Arrhenius temperature + dependence. An external coolant jacket provides or removes heat. + + Mathematical Formulation + ------------------------- + The state vector is :math:`[C_A, T]` where :math:`C_A` is the concentration + of species A and :math:`T` is the reactor temperature. + + .. math:: + + \\frac{dC_A}{dt} = \\frac{C_{A,in} - C_A}{\\tau} - k(T) \\, C_A^n + + .. math:: + + \\frac{dT}{dt} = \\frac{T_{in} - T}{\\tau} + + \\frac{(-\\Delta H_{rxn})}{\\rho \\, C_p} \\, k(T) \\, C_A^n + + \\frac{UA}{\\rho \\, C_p \\, V} \\, (T_c - T) + + where the Arrhenius rate constant is: + + .. math:: + + k(T) = k_0 \\, \\exp\\!\\left(-\\frac{E_a}{R \\, T}\\right) + + and the residence time is :math:`\\tau = V / F`. + + Parameters + ---------- + V : float + Reactor volume [m³]. + F : float + Volumetric flow rate [m³/s]. + k0 : float + Pre-exponential Arrhenius factor [1/s for n=1, (m³/mol)^(n-1)/s]. + Ea : float + Activation energy [J/mol]. + n : float + Reaction order with respect to species A [-]. + dH_rxn : float + Heat of reaction [J/mol]. Negative for exothermic reactions. + rho : float + Fluid density [kg/m³]. + Cp : float + Fluid heat capacity [J/(kg·K)]. + UA : float + Overall heat transfer coefficient times area [W/K]. + C_A0 : float + Initial concentration of A [mol/m³]. + T0 : float + Initial reactor temperature [K]. + """ + + input_port_labels = { + "C_in": 0, + "T_in": 1, + "T_c": 2, + } + + output_port_labels = { + "C_out": 0, + "T_out": 1, + } + + def __init__(self, V=1.0, F=0.1, k0=1e6, Ea=50000.0, n=1.0, + dH_rxn=-50000.0, rho=1000.0, Cp=4184.0, UA=500.0, + C_A0=0.0, T0=300.0): + + # input validation + if V <= 0: + raise ValueError(f"'V' must be positive but is {V}") + if F <= 0: + raise ValueError(f"'F' must be positive but is {F}") + if rho <= 0: + raise ValueError(f"'rho' must be positive but is {rho}") + if Cp <= 0: + raise ValueError(f"'Cp' must be positive but is {Cp}") + + # store parameters + self.V = V + self.F = F + self.k0 = k0 + self.Ea = Ea + self.n = n + self.dH_rxn = dH_rxn + self.rho = rho + self.Cp = Cp + self.UA = UA + + # rhs of CSTR ode system + def _fn_d(x, u, t): + C_A, T = x + C_A_in, T_in, T_c = u + + tau = self.V / self.F + k = self.k0 * np.exp(-self.Ea / (R_GAS * T)) + r = k * C_A**self.n + rcp = (-self.dH_rxn) / (self.rho * self.Cp) + ua_term = self.UA / (self.rho * self.Cp * self.V) + + dC_A = (C_A_in - C_A) / tau - r + dT = (T_in - T) / tau + rcp * r + ua_term * (T_c - T) + + return np.array([dC_A, dT]) + + # jacobian of rhs wrt state [C_A, T] + def _jc_d(x, u, t): + C_A, T = x + + tau = self.V / self.F + k = self.k0 * np.exp(-self.Ea / (R_GAS * T)) + dk_dT = k * self.Ea / (R_GAS * T**2) + + dr_dCA = k * self.n * C_A**(self.n - 1) if C_A > 0 else 0.0 + dr_dT = dk_dT * C_A**self.n + + rcp = (-self.dH_rxn) / (self.rho * self.Cp) + ua_term = self.UA / (self.rho * self.Cp * self.V) + + return np.array([ + [-1.0/tau - dr_dCA, -dr_dT], + [rcp * dr_dCA, -1.0/tau + rcp * dr_dT - ua_term] + ]) + + # output function: well-mixed => outlet = state + def _fn_a(x, u, t): + return x.copy() + + super().__init__( + func_dyn=_fn_d, + jac_dyn=_jc_d, + func_alg=_fn_a, + initial_value=np.array([C_A0, T0]), + ) diff --git a/src/pathsim_chem/process/distillation.py b/src/pathsim_chem/process/distillation.py new file mode 100644 index 0000000..1656f06 --- /dev/null +++ b/src/pathsim_chem/process/distillation.py @@ -0,0 +1,119 @@ +######################################################################################### +## +## Single Equilibrium Distillation Tray Block +## +######################################################################################### + +# IMPORTS =============================================================================== + +import numpy as np + +from pathsim.blocks.dynsys import DynamicalSystem + +# BLOCKS ================================================================================ + +class DistillationTray(DynamicalSystem): + """Single equilibrium distillation tray with constant molar overflow. + + Models one tray of a distillation column under the constant molar + overflow (CMO) assumption. Liquid flows down from the tray above, + vapor rises from the tray below. VLE is computed using constant + relative volatility. + + Multiple trays can be wired in series via ``Connection`` objects to + build a full distillation column. + + Mathematical Formulation + ------------------------- + For a binary system with mole fraction :math:`x` of the light component + on the tray: + + .. math:: + + M \\frac{dx}{dt} = L_{in} \\, x_{in} + V_{in} \\, y_{in} + - L_{out} \\, x - V_{out} \\, y + + where VLE with constant relative volatility :math:`\\alpha` gives: + + .. math:: + + y = \\frac{\\alpha \\, x}{1 + (\\alpha - 1) \\, x} + + Under CMO: :math:`L_{out} = L_{in}` and :math:`V_{out} = V_{in}`. + + Parameters + ---------- + M : float + Liquid holdup on the tray [mol]. + alpha : float + Relative volatility of light to heavy component [-]. + x0 : float + Initial liquid mole fraction of light component [-]. + """ + + input_port_labels = { + "L_in": 0, + "x_in": 1, + "V_in": 2, + "y_in": 3, + } + + output_port_labels = { + "L_out": 0, + "x_out": 1, + "V_out": 2, + "y_out": 3, + } + + def __init__(self, M=1.0, alpha=2.5, x0=0.5): + + # input validation + if M <= 0: + raise ValueError(f"'M' must be positive but is {M}") + if alpha <= 0: + raise ValueError(f"'alpha' must be positive but is {alpha}") + if not 0.0 <= x0 <= 1.0: + raise ValueError(f"'x0' must be in [0, 1] but is {x0}") + + self.M = M + self.alpha = alpha + + # VLE: y = alpha*x / (1 + (alpha-1)*x) + def _vle(x_val): + return self.alpha * x_val / (1.0 + (self.alpha - 1.0) * x_val) + + # rhs of tray ode + def _fn_d(x, u, t): + L_in, x_in, V_in, y_in = u + + x_tray = x[0] + y_tray = _vle(x_tray) + + # CMO: L_out = L_in, V_out = V_in + dx = (L_in * x_in + V_in * y_in - L_in * x_tray - V_in * y_tray) / self.M + return np.array([dx]) + + # jacobian wrt x + def _jc_d(x, u, t): + L_in, x_in, V_in, y_in = u + x_tray = x[0] + + # dy/dx = alpha / (1 + (alpha-1)*x)^2 + denom = (1.0 + (self.alpha - 1.0) * x_tray)**2 + dy_dx = self.alpha / denom + + return np.array([[-L_in / self.M - V_in * dy_dx / self.M]]) + + # output: L_out, x, V_out, y (CMO: pass-through flows) + def _fn_a(x, u, t): + L_in, x_in, V_in, y_in = u + x_tray = x[0] + y_tray = _vle(x_tray) + return np.array([L_in, x_tray, V_in, y_tray]) + + super().__init__( + func_dyn=_fn_d, + jac_dyn=_jc_d, + func_alg=_fn_a, + initial_value=np.array([x0]), + ) diff --git a/src/pathsim_chem/process/flash_drum.py b/src/pathsim_chem/process/flash_drum.py new file mode 100644 index 0000000..33c5f43 --- /dev/null +++ b/src/pathsim_chem/process/flash_drum.py @@ -0,0 +1,161 @@ +######################################################################################### +## +## Binary Isothermal Flash Drum Block +## +######################################################################################### + +# IMPORTS =============================================================================== + +import numpy as np + +from pathsim.blocks.dynsys import DynamicalSystem + +# BLOCKS ================================================================================ + +class FlashDrum(DynamicalSystem): + """Binary isothermal flash drum with Raoult's law vapor-liquid equilibrium. + + Models a flash drum with liquid holdup for a binary mixture. Feed enters as + liquid, vapor and liquid streams exit. Temperature and pressure are specified + as inputs. VLE is computed algebraically using K-values from Raoult's law: + + .. math:: + + K_i = \\frac{P_{sat,i}(T)}{P} + + where the Antoine equation gives the saturation pressure: + + .. math:: + + \\ln P_{sat,i} = A_i - \\frac{B_i}{T + C_i} + + The Rachford-Rice equation determines the vapor fraction :math:`\\beta`: + + .. math:: + + \\sum_i \\frac{z_i (K_i - 1)}{1 + \\beta (K_i - 1)} = 0 + + For a binary system this has the analytical solution: + + .. math:: + + \\beta = -\\frac{z_1(K_1 - 1) + z_2(K_2 - 1)}{(K_1 - 1)(K_2 - 1)} + + Dynamic States + --------------- + The holdup moles of each component in the liquid phase: + + .. math:: + + \\frac{dN_i}{dt} = F z_i - V y_i - L x_i + + Parameters + ---------- + holdup : float + Total liquid holdup [mol]. Assumed approximately constant. + antoine_A : array_like + Antoine A parameters for each component [ln(Pa)]. + antoine_B : array_like + Antoine B parameters for each component [K]. + antoine_C : array_like + Antoine C parameters for each component [K]. + N0 : array_like or None + Initial component holdup moles [mol]. If None, equal split assumed. + """ + + input_port_labels = { + "F": 0, + "z_1": 1, + "T": 2, + "P": 3, + } + + output_port_labels = { + "V_rate": 0, + "L_rate": 1, + "y_1": 2, + "x_1": 3, + } + + def __init__(self, holdup=100.0, + antoine_A=None, antoine_B=None, antoine_C=None, + N0=None): + + # input validation + if holdup <= 0: + raise ValueError(f"'holdup' must be positive but is {holdup}") + + self.holdup = holdup + + # default Antoine parameters: benzene / toluene (Pa, K) + self.antoine_A = np.array(antoine_A) if antoine_A is not None else np.array([15.9008, 16.0137]) + self.antoine_B = np.array(antoine_B) if antoine_B is not None else np.array([2788.51, 3096.52]) + self.antoine_C = np.array(antoine_C) if antoine_C is not None else np.array([-52.36, -53.67]) + + if len(self.antoine_A) != 2: + raise ValueError("Binary flash: Antoine parameters must have length 2") + + # initial holdup (equal moles by default) + if N0 is not None: + x0 = np.array(N0, dtype=float) + else: + x0 = np.array([holdup / 2.0, holdup / 2.0]) + + # shared VLE computation + def _solve_vle(z, T, P): + """Solve binary Rachford-Rice analytically, return (beta, y, x).""" + P_sat = np.exp(self.antoine_A - self.antoine_B / (T + self.antoine_C)) + K = P_sat / P + + d1 = K[0] - 1.0 + d2 = K[1] - 1.0 + + if abs(d1) < 1e-12 and abs(d2) < 1e-12: + beta = 0.0 + else: + den = d1 * d2 + beta = -(z[0] * d1 + z[1] * d2) / den if abs(den) > 1e-12 else 0.0 + + beta = np.clip(beta, 0.0, 1.0) + + y = z * K / (1.0 + beta * (K - 1.0)) + x_eq = z / (1.0 + beta * (K - 1.0)) + + # normalize for numerical safety + y_s, x_s = y.sum(), x_eq.sum() + if y_s > 0: + y = y / y_s + if x_s > 0: + x_eq = x_eq / x_s + + return beta, y, x_eq + + # rhs of flash drum ode + def _fn_d(x, u, t): + F_in, z_1, T, P = u + z = np.array([z_1, 1.0 - z_1]) + + beta, y, x_eq = _solve_vle(z, T, P) + + V_rate = beta * F_in + L_rate = (1.0 - beta) * F_in + + return F_in * z - V_rate * y - L_rate * x_eq + + # algebraic output + def _fn_a(x, u, t): + F_in, z_1, T, P = u + z = np.array([z_1, 1.0 - z_1]) + + beta, y, x_eq = _solve_vle(z, T, P) + + V_rate = beta * F_in + L_rate = (1.0 - beta) * F_in + + return np.array([V_rate, L_rate, y[0], x_eq[0]]) + + super().__init__( + func_dyn=_fn_d, + func_alg=_fn_a, + initial_value=x0, + ) diff --git a/src/pathsim_chem/process/heat_exchanger.py b/src/pathsim_chem/process/heat_exchanger.py new file mode 100644 index 0000000..2e5465c --- /dev/null +++ b/src/pathsim_chem/process/heat_exchanger.py @@ -0,0 +1,194 @@ +######################################################################################### +## +## Counter-Current Shell & Tube Heat Exchanger Block +## +######################################################################################### + +# IMPORTS =============================================================================== + +import numpy as np + +from pathsim.blocks.dynsys import DynamicalSystem + +# BLOCKS ================================================================================ + +class HeatExchanger(DynamicalSystem): + """Counter-current shell and tube heat exchanger with discretized cells. + + The exchanger is divided into N cells along its length. The hot stream + flows from cell 1 to N, the cold stream flows from cell N to 1 + (counter-current). Each cell exchanges heat proportional to the local + temperature difference. + + Mathematical Formulation + ------------------------- + For each cell :math:`i = 1, \\ldots, N`: + + .. math:: + + \\frac{dT_{h,i}}{dt} = \\frac{F_h}{V_{cell,h}} (T_{h,i-1} - T_{h,i}) + - \\frac{UA_{cell}}{\\rho_h C_{p,h} V_{cell,h}} (T_{h,i} - T_{c,i}) + + .. math:: + + \\frac{dT_{c,i}}{dt} = \\frac{F_c}{V_{cell,c}} (T_{c,i+1} - T_{c,i}) + + \\frac{UA_{cell}}{\\rho_c C_{p,c} V_{cell,c}} (T_{h,i} - T_{c,i}) + + where :math:`T_{h,0} = T_{h,in}` and :math:`T_{c,N+1} = T_{c,in}`. + + The state vector is ordered as + :math:`[T_{h,1}, T_{c,1}, T_{h,2}, T_{c,2}, \\ldots, T_{h,N}, T_{c,N}]`. + + Parameters + ---------- + N_cells : int + Number of discretization cells along the exchanger [-]. + F_h : float + Hot stream volumetric flow rate [m³/s]. + F_c : float + Cold stream volumetric flow rate [m³/s]. + V_h : float + Total hot-side volume [m³]. + V_c : float + Total cold-side volume [m³]. + UA : float + Total overall heat transfer coefficient times area [W/K]. + rho_h : float + Hot stream density [kg/m³]. + Cp_h : float + Hot stream heat capacity [J/(kg·K)]. + rho_c : float + Cold stream density [kg/m³]. + Cp_c : float + Cold stream heat capacity [J/(kg·K)]. + T_h0 : float + Initial hot-side temperature [K]. + T_c0 : float + Initial cold-side temperature [K]. + """ + + input_port_labels = { + "T_h_in": 0, + "T_c_in": 1, + } + + output_port_labels = { + "T_h_out": 0, + "T_c_out": 1, + } + + def __init__(self, N_cells=5, F_h=0.1, F_c=0.1, V_h=0.5, V_c=0.5, + UA=500.0, rho_h=1000.0, Cp_h=4184.0, rho_c=1000.0, Cp_c=4184.0, + T_h0=370.0, T_c0=300.0): + + # input validation + if N_cells < 1: + raise ValueError(f"'N_cells' must be >= 1 but is {N_cells}") + if F_h <= 0: + raise ValueError(f"'F_h' must be positive but is {F_h}") + if F_c <= 0: + raise ValueError(f"'F_c' must be positive but is {F_c}") + if V_h <= 0: + raise ValueError(f"'V_h' must be positive but is {V_h}") + if V_c <= 0: + raise ValueError(f"'V_c' must be positive but is {V_c}") + + # store parameters + self.N_cells = int(N_cells) + self.F_h = F_h + self.F_c = F_c + self.V_h = V_h + self.V_c = V_c + self.UA = UA + self.rho_h = rho_h + self.Cp_h = Cp_h + self.rho_c = rho_c + self.Cp_c = Cp_c + + N = self.N_cells + + # initial state: interleaved [T_h1, T_c1, T_h2, T_c2, ...] + x0 = np.empty(2 * N) + x0[0::2] = T_h0 + x0[1::2] = T_c0 + + # rhs of heat exchanger ode (vectorized) + def _fn_d(x, u, t): + T_h_in, T_c_in = u + N = self.N_cells + + V_cell_h = self.V_h / N + V_cell_c = self.V_c / N + UA_cell = self.UA / N + + fh = self.F_h / V_cell_h + fc = self.F_c / V_cell_c + ah = UA_cell / (self.rho_h * self.Cp_h * V_cell_h) + ac = UA_cell / (self.rho_c * self.Cp_c * V_cell_c) + + T_h = x[0::2] + T_c = x[1::2] + + # upstream temperatures with boundary conditions + T_h_prev = np.empty(N) + T_h_prev[0] = T_h_in + T_h_prev[1:] = T_h[:-1] + + T_c_next = np.empty(N) + T_c_next[-1] = T_c_in + T_c_next[:-1] = T_c[1:] + + dx = np.empty(2 * N) + dx[0::2] = fh * (T_h_prev - T_h) - ah * (T_h - T_c) + dx[1::2] = fc * (T_c_next - T_c) + ac * (T_h - T_c) + + return dx + + # analytical jacobian (block-tridiagonal structure) + def _jc_d(x, u, t): + N = self.N_cells + + V_cell_h = self.V_h / N + V_cell_c = self.V_c / N + UA_cell = self.UA / N + + fh = self.F_h / V_cell_h + fc = self.F_c / V_cell_c + ah = UA_cell / (self.rho_h * self.Cp_h * V_cell_h) + ac = UA_cell / (self.rho_c * self.Cp_c * V_cell_c) + + dim = 2 * N + J = np.zeros((dim, dim)) + + for i in range(N): + hi = 2 * i # hot index + ci = 2 * i + 1 # cold index + + # dT_h_i/dT_h_i, dT_h_i/dT_c_i + J[hi, hi] = -fh - ah + J[hi, ci] = ah + + # dT_c_i/dT_h_i, dT_c_i/dT_c_i + J[ci, hi] = ac + J[ci, ci] = -fc - ac + + # dT_h_i/dT_h_{i-1} (hot upstream coupling) + if i > 0: + J[hi, 2*(i-1)] = fh + + # dT_c_i/dT_c_{i+1} (cold upstream coupling, counter-current) + if i < N - 1: + J[ci, 2*(i+1) + 1] = fc + + return J + + # output: hot outlet = last cell hot, cold outlet = first cell cold + def _fn_a(x, u, t): + return np.array([x[2*(self.N_cells - 1)], x[1]]) + + super().__init__( + func_dyn=_fn_d, + jac_dyn=_jc_d, + func_alg=_fn_a, + initial_value=x0, + ) diff --git a/tests/process/__init__.py b/tests/process/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/process/test_cstr.py b/tests/process/test_cstr.py new file mode 100644 index 0000000..ddb63fe --- /dev/null +++ b/tests/process/test_cstr.py @@ -0,0 +1,123 @@ +######################################################################################## +## +## TESTS FOR +## 'process.cstr.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim_chem.process import CSTR + +from pathsim.solvers import EUF + + +# TESTS ================================================================================ + +class TestCSTR(unittest.TestCase): + """Test the CSTR (Continuous Stirred-Tank Reactor) block.""" + + def test_init_default(self): + """Test default initialization.""" + C = CSTR() + self.assertEqual(C.V, 1.0) + self.assertEqual(C.F, 0.1) + self.assertEqual(C.k0, 1e6) + self.assertEqual(C.Ea, 50000.0) + self.assertEqual(C.n, 1.0) + self.assertEqual(C.dH_rxn, -50000.0) + self.assertEqual(C.rho, 1000.0) + self.assertEqual(C.Cp, 4184.0) + self.assertEqual(C.UA, 500.0) + + def test_init_custom(self): + """Test custom initialization.""" + C = CSTR(V=2.0, F=0.5, k0=1e4, Ea=40000.0, n=2.0, + dH_rxn=-30000.0, rho=800.0, Cp=3000.0, UA=200.0, + C_A0=1.5, T0=350.0) + self.assertEqual(C.V, 2.0) + self.assertEqual(C.F, 0.5) + + C.set_solver(EUF, parent=None) + self.assertTrue(np.allclose(C.engine.initial_value, [1.5, 350.0])) + + def test_init_validation(self): + """Test input validation.""" + with self.assertRaises(ValueError): + CSTR(V=0) + with self.assertRaises(ValueError): + CSTR(V=-1) + with self.assertRaises(ValueError): + CSTR(F=0) + with self.assertRaises(ValueError): + CSTR(rho=-1) + with self.assertRaises(ValueError): + CSTR(Cp=0) + + def test_port_labels(self): + """Test port label definitions.""" + self.assertEqual(CSTR.input_port_labels["C_in"], 0) + self.assertEqual(CSTR.input_port_labels["T_in"], 1) + self.assertEqual(CSTR.input_port_labels["T_c"], 2) + self.assertEqual(CSTR.output_port_labels["C_out"], 0) + self.assertEqual(CSTR.output_port_labels["T_out"], 1) + + def test_output_equals_state(self): + """Test that outputs equal state (well-mixed assumption).""" + C = CSTR(C_A0=2.0, T0=350.0) + C.set_solver(EUF, parent=None) + C.update(None) + + self.assertAlmostEqual(C.outputs[0], 2.0) + self.assertAlmostEqual(C.outputs[1], 350.0) + + def test_steady_state_no_reaction(self): + """At very low k0, reactor is essentially non-reactive. + Steady state should be C_out ≈ C_in, T_out ≈ T_in (when UA=0).""" + C = CSTR(V=1.0, F=1.0, k0=0.0, Ea=0.0, n=1.0, + dH_rxn=0.0, UA=0.0, C_A0=1.0, T0=350.0) + C.set_solver(EUF, parent=None) + + # Set feed conditions + C.inputs[0] = 1.0 # C_A_in + C.inputs[1] = 350.0 # T_in + C.inputs[2] = 300.0 # T_c (irrelevant when UA=0) + + # At steady state with k0=0: dC/dt = (C_in - C)/tau = 0 => C = C_in + # With initial C_A = C_in, the derivative should be zero + state = C.engine.get() + self.assertTrue(np.allclose(state, [1.0, 350.0])) + + def test_arrhenius_rate(self): + """Verify Arrhenius rate constant computation indirectly through dynamics.""" + R_GAS = 8.314 + k0, Ea, T = 1e6, 50000.0, 350.0 + k_expected = k0 * np.exp(-Ea / (R_GAS * T)) + + # At C_A=1, n=1, the reaction rate should be k + C = CSTR(V=1.0, F=1.0, k0=k0, Ea=Ea, n=1.0, + dH_rxn=0.0, UA=0.0, C_A0=1.0, T0=T) + C.set_solver(EUF, parent=None) + + C.inputs[0] = 1.0 # C_A_in = C_A => flow terms cancel + C.inputs[1] = T # T_in = T => flow terms cancel + C.inputs[2] = T # T_c = T + + # dC_A/dt = 0 (flow) - k * C_A = -k + # dT/dt = 0 since dH_rxn=0 and UA=0 + # Evaluate the dynamics function directly + x = np.array([1.0, T]) + u = np.array([1.0, T, T]) + dx = C.op_dyn(x, u, 0) + + self.assertAlmostEqual(dx[0], -k_expected, places=5) + self.assertAlmostEqual(dx[1], 0.0, places=5) + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/process/test_distillation.py b/tests/process/test_distillation.py new file mode 100644 index 0000000..56f2fbe --- /dev/null +++ b/tests/process/test_distillation.py @@ -0,0 +1,155 @@ +######################################################################################## +## +## TESTS FOR +## 'process.distillation.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim_chem.process import DistillationTray + +from pathsim.solvers import EUF + + +# TESTS ================================================================================ + +class TestDistillationTray(unittest.TestCase): + """Test the single equilibrium distillation tray block.""" + + def test_init_default(self): + """Test default initialization.""" + D = DistillationTray() + self.assertEqual(D.M, 1.0) + self.assertEqual(D.alpha, 2.5) + + def test_init_custom(self): + """Test custom initialization.""" + D = DistillationTray(M=5.0, alpha=3.0, x0=0.3) + self.assertEqual(D.M, 5.0) + self.assertEqual(D.alpha, 3.0) + + D.set_solver(EUF, parent=None) + self.assertTrue(np.allclose(D.engine.initial_value, [0.3])) + + def test_init_validation(self): + """Test input validation.""" + with self.assertRaises(ValueError): + DistillationTray(M=0) + with self.assertRaises(ValueError): + DistillationTray(M=-1) + with self.assertRaises(ValueError): + DistillationTray(alpha=0) + with self.assertRaises(ValueError): + DistillationTray(alpha=-1) + with self.assertRaises(ValueError): + DistillationTray(x0=-0.1) + with self.assertRaises(ValueError): + DistillationTray(x0=1.1) + + def test_port_labels(self): + """Test port label definitions.""" + self.assertEqual(DistillationTray.input_port_labels["L_in"], 0) + self.assertEqual(DistillationTray.input_port_labels["x_in"], 1) + self.assertEqual(DistillationTray.input_port_labels["V_in"], 2) + self.assertEqual(DistillationTray.input_port_labels["y_in"], 3) + self.assertEqual(DistillationTray.output_port_labels["L_out"], 0) + self.assertEqual(DistillationTray.output_port_labels["x_out"], 1) + self.assertEqual(DistillationTray.output_port_labels["V_out"], 2) + self.assertEqual(DistillationTray.output_port_labels["y_out"], 3) + + def test_vle_relationship(self): + """Test that y = alpha*x / (1 + (alpha-1)*x).""" + alpha = 2.5 + x0 = 0.4 + D = DistillationTray(alpha=alpha, x0=x0) + D.set_solver(EUF, parent=None) + + D.inputs[0] = 1.0 # L_in + D.inputs[1] = 0.5 # x_in + D.inputs[2] = 1.0 # V_in + D.inputs[3] = 0.5 # y_in + + D.update(None) + + y_expected = alpha * x0 / (1.0 + (alpha - 1.0) * x0) + + self.assertAlmostEqual(D.outputs[1], x0) # x_out = state + self.assertAlmostEqual(D.outputs[3], y_expected) # y_out = VLE + + def test_cmo_flow_passthrough(self): + """Under CMO, L_out = L_in and V_out = V_in.""" + D = DistillationTray() + D.set_solver(EUF, parent=None) + + L_in, V_in = 2.0, 3.0 + D.inputs[0] = L_in + D.inputs[1] = 0.5 + D.inputs[2] = V_in + D.inputs[3] = 0.5 + + D.update(None) + + self.assertAlmostEqual(D.outputs[0], L_in) # L_out + self.assertAlmostEqual(D.outputs[2], V_in) # V_out + + def test_mass_balance_steady_state(self): + """At steady state, input = output (mass balance).""" + D = DistillationTray(alpha=2.5, x0=0.5) + D.set_solver(EUF, parent=None) + + L_in, V_in = 1.0, 1.0 + x_in, y_in = 0.6, 0.4 + + D.inputs[0] = L_in + D.inputs[1] = x_in + D.inputs[2] = V_in + D.inputs[3] = y_in + + # At steady state: L_in*x_in + V_in*y_in = L_out*x + V_out*y + # This is only true when dx/dt = 0, i.e. at the steady state composition. + # For now, just verify that total input flow = total output flow (CMO). + D.update(None) + total_in = L_in + V_in + total_out = D.outputs[0] + D.outputs[2] + self.assertAlmostEqual(total_in, total_out) + + def test_enrichment(self): + """Vapor should be enriched in light component relative to liquid.""" + D = DistillationTray(alpha=3.0, x0=0.3) + D.set_solver(EUF, parent=None) + + D.inputs[0] = 1.0 + D.inputs[1] = 0.5 + D.inputs[2] = 1.0 + D.inputs[3] = 0.5 + + D.update(None) + + x_out = D.outputs[1] + y_out = D.outputs[3] + + # With alpha > 1, y > x always + self.assertGreater(y_out, x_out) + + def test_jacobian_sign(self): + """Jacobian diagonal should be negative (stable).""" + D = DistillationTray(alpha=2.5, x0=0.5, M=1.0) + D.set_solver(EUF, parent=None) + + L_in, V_in = 1.0, 1.0 + x = np.array([0.5]) + u = np.array([L_in, 0.5, V_in, 0.5]) + + # Evaluate jacobian + J = D.op_dyn.jac_x(x, u, 0) + self.assertLess(J[0, 0], 0) # stable + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/process/test_flash_drum.py b/tests/process/test_flash_drum.py new file mode 100644 index 0000000..454e04a --- /dev/null +++ b/tests/process/test_flash_drum.py @@ -0,0 +1,143 @@ +######################################################################################## +## +## TESTS FOR +## 'process.flash_drum.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim_chem.process import FlashDrum + +from pathsim.solvers import EUF + + +# TESTS ================================================================================ + +class TestFlashDrum(unittest.TestCase): + """Test the binary isothermal flash drum block.""" + + def test_init_default(self): + """Test default initialization.""" + F = FlashDrum() + self.assertEqual(F.holdup, 100.0) + self.assertEqual(len(F.antoine_A), 2) + self.assertEqual(len(F.antoine_B), 2) + self.assertEqual(len(F.antoine_C), 2) + + def test_init_custom(self): + """Test custom initialization.""" + F = FlashDrum(holdup=200.0, + antoine_A=[15.0, 16.0], + antoine_B=[2800.0, 3100.0], + antoine_C=[-50.0, -55.0], + N0=[120.0, 80.0]) + self.assertEqual(F.holdup, 200.0) + + F.set_solver(EUF, parent=None) + self.assertTrue(np.allclose(F.engine.initial_value, [120.0, 80.0])) + + def test_init_validation(self): + """Test input validation.""" + with self.assertRaises(ValueError): + FlashDrum(holdup=0) + with self.assertRaises(ValueError): + FlashDrum(holdup=-10) + with self.assertRaises(ValueError): + FlashDrum(antoine_A=[1.0, 2.0, 3.0]) # not binary + + def test_port_labels(self): + """Test port label definitions.""" + self.assertEqual(FlashDrum.input_port_labels["F"], 0) + self.assertEqual(FlashDrum.input_port_labels["z_1"], 1) + self.assertEqual(FlashDrum.input_port_labels["T"], 2) + self.assertEqual(FlashDrum.input_port_labels["P"], 3) + self.assertEqual(FlashDrum.output_port_labels["V_rate"], 0) + self.assertEqual(FlashDrum.output_port_labels["L_rate"], 1) + self.assertEqual(FlashDrum.output_port_labels["y_1"], 2) + self.assertEqual(FlashDrum.output_port_labels["x_1"], 3) + + def test_default_holdup(self): + """Default holdup should split equally between components.""" + F = FlashDrum(holdup=100.0) + F.set_solver(EUF, parent=None) + self.assertTrue(np.allclose(F.engine.initial_value, [50.0, 50.0])) + + def test_output_flow_balance(self): + """V_rate + L_rate should equal F_in.""" + F = FlashDrum() + F.set_solver(EUF, parent=None) + + F_in = 10.0 + F.inputs[0] = F_in # F + F.inputs[1] = 0.5 # z_1 + F.inputs[2] = 360.0 # T [K] + F.inputs[3] = 101325.0 # P [Pa] + + F.update(None) + + V_rate = F.outputs[0] + L_rate = F.outputs[1] + self.assertAlmostEqual(V_rate + L_rate, F_in, places=8) + + def test_vle_consistency(self): + """y_1 and x_1 should be in [0, 1].""" + F = FlashDrum() + F.set_solver(EUF, parent=None) + + F.inputs[0] = 10.0 # F + F.inputs[1] = 0.5 # z_1 + F.inputs[2] = 360.0 # T [K] + F.inputs[3] = 101325.0 # P [Pa] + + F.update(None) + + y_1 = F.outputs[2] + x_1 = F.outputs[3] + self.assertGreaterEqual(y_1, 0.0) + self.assertLessEqual(y_1, 1.0) + self.assertGreaterEqual(x_1, 0.0) + self.assertLessEqual(x_1, 1.0) + + def test_light_component_enriched_in_vapor(self): + """The more volatile component should be enriched in vapor phase.""" + # Default Antoine params: component 1 has lower B (higher Psat) => more volatile + F = FlashDrum() + F.set_solver(EUF, parent=None) + + F.inputs[0] = 10.0 + F.inputs[1] = 0.5 + F.inputs[2] = 360.0 + F.inputs[3] = 101325.0 + + F.update(None) + + y_1 = F.outputs[2] + x_1 = F.outputs[3] + + # y_1 > x_1 for the light (more volatile) component + self.assertGreater(y_1, x_1) + + def test_no_feed(self): + """With zero feed, rates should be zero.""" + F = FlashDrum() + F.set_solver(EUF, parent=None) + + F.inputs[0] = 0.0 # F = 0 + F.inputs[1] = 0.5 + F.inputs[2] = 350.0 + F.inputs[3] = 101325.0 + + F.update(None) + + self.assertAlmostEqual(F.outputs[0], 0.0) # V_rate + self.assertAlmostEqual(F.outputs[1], 0.0) # L_rate + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/process/test_heat_exchanger.py b/tests/process/test_heat_exchanger.py new file mode 100644 index 0000000..7ba0549 --- /dev/null +++ b/tests/process/test_heat_exchanger.py @@ -0,0 +1,125 @@ +######################################################################################## +## +## TESTS FOR +## 'process.heat_exchanger.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim_chem.process import HeatExchanger + +from pathsim.solvers import EUF + + +# TESTS ================================================================================ + +class TestHeatExchanger(unittest.TestCase): + """Test the counter-current heat exchanger block.""" + + def test_init_default(self): + """Test default initialization.""" + H = HeatExchanger() + self.assertEqual(H.N_cells, 5) + self.assertEqual(H.F_h, 0.1) + self.assertEqual(H.F_c, 0.1) + self.assertEqual(H.UA, 500.0) + + def test_init_custom(self): + """Test custom initialization.""" + H = HeatExchanger(N_cells=10, F_h=0.5, F_c=0.3, V_h=1.0, V_c=0.8, + UA=1000.0, rho_h=900.0, Cp_h=3500.0, + rho_c=1000.0, Cp_c=4184.0, + T_h0=400.0, T_c0=290.0) + self.assertEqual(H.N_cells, 10) + self.assertEqual(H.F_h, 0.5) + + H.set_solver(EUF, parent=None) + state = H.engine.initial_value + self.assertEqual(len(state), 20) # 2 * N_cells + self.assertTrue(np.all(state[0::2] == 400.0)) # hot cells + self.assertTrue(np.all(state[1::2] == 290.0)) # cold cells + + def test_init_validation(self): + """Test input validation.""" + with self.assertRaises(ValueError): + HeatExchanger(N_cells=0) + with self.assertRaises(ValueError): + HeatExchanger(F_h=-1) + with self.assertRaises(ValueError): + HeatExchanger(F_c=0) + with self.assertRaises(ValueError): + HeatExchanger(V_h=-1) + with self.assertRaises(ValueError): + HeatExchanger(V_c=0) + + def test_port_labels(self): + """Test port label definitions.""" + self.assertEqual(HeatExchanger.input_port_labels["T_h_in"], 0) + self.assertEqual(HeatExchanger.input_port_labels["T_c_in"], 1) + self.assertEqual(HeatExchanger.output_port_labels["T_h_out"], 0) + self.assertEqual(HeatExchanger.output_port_labels["T_c_out"], 1) + + def test_state_size(self): + """Test that state vector has correct size.""" + for N in [1, 3, 10]: + H = HeatExchanger(N_cells=N) + H.set_solver(EUF, parent=None) + self.assertEqual(len(H.engine.initial_value), 2 * N) + + def test_output_initial(self): + """Test outputs at initial state.""" + H = HeatExchanger(N_cells=3, T_h0=400.0, T_c0=300.0) + H.set_solver(EUF, parent=None) + H.update(None) + + # Hot outlet = last hot cell = T_h0 initially + self.assertAlmostEqual(H.outputs[0], 400.0) + # Cold outlet = first cold cell = T_c0 initially + self.assertAlmostEqual(H.outputs[1], 300.0) + + def test_energy_conservation_direction(self): + """Test that heat flows from hot to cold (derivatives have correct sign).""" + H = HeatExchanger(N_cells=1, F_h=0.1, F_c=0.1, V_h=0.5, V_c=0.5, + UA=500.0, T_h0=400.0, T_c0=300.0) + H.set_solver(EUF, parent=None) + + # Set inlet temps equal to initial (no flow effects, only heat transfer) + H.inputs[0] = 400.0 # T_h_in + H.inputs[1] = 300.0 # T_c_in + + # Evaluate dynamics + x = H.engine.get() + u = np.array([400.0, 300.0]) + dx = H.op_dyn(x, u, 0) + + # Hot side should cool (negative dT_h) due to heat transfer + # Cold side should warm (positive dT_c) due to heat transfer + # With flow terms = 0 (since inlet = state), only UA term matters + self.assertLess(dx[0], 0) # dT_h < 0 (cooling) + self.assertGreater(dx[1], 0) # dT_c > 0 (warming) + + def test_no_transfer_equal_temps(self): + """When hot and cold are at same temperature, no heat transfer occurs.""" + T_eq = 350.0 + H = HeatExchanger(N_cells=2, T_h0=T_eq, T_c0=T_eq) + H.set_solver(EUF, parent=None) + + H.inputs[0] = T_eq + H.inputs[1] = T_eq + + x = H.engine.get() + u = np.array([T_eq, T_eq]) + dx = H.op_dyn(x, u, 0) + + # All derivatives should be zero + self.assertTrue(np.allclose(dx, 0.0, atol=1e-10)) + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2)