import pandas as pd
import json
import data_input
# Supply
from data_input import (d_pipe_ext_supply_m, d_pipe_nom_supply_mm, l_pipesection_supply_m, th_insulation_supply_percent, location_supply,
mdot_takeoff_supply_kg_per_s, lat_supply, lon_supply, )
# Return
from data_input import (d_pipe_ext_return_m, d_pipe_nom_return_mm, l_pipesection_return_m, th_insulation_return_percent, location_return,
mdot_takeoff_return_kg_per_s, lat_return, lon_return, )
import data_output
from data_output import PipeRow, SystemRow
from config_data import BranchInitialConfig
from utils.constants import TZERO
from utils.functions import (calculate_flow_velocity, calculate_fluid_density, calculate_fluid_specific_heat, calculate_pipe_internal_diameter,
calculate_insulation_external_diameter, calculate_insulation_thickness,
select_heat_transfer_coeff, calculate_r_total, select_ambient_temperature,
calculate_output_temperature, )
from utils.validation import validate_damage
from utils.exceptions import SupplyDataMissingError
from model_param import ThermalCoeff, PipeSectionLocation
# Getting thickness data
thickness_data_location = "../data/insulation_thickness.json"
try:
with open(thickness_data_location, "r") as th_file:
data = json.load(th_file)
th_mm = data.get("thickness_data", {})
except FileNotFoundError:
print("File not found.")
th_mm = {}
# Calculations
[docs]
class Branch:
class_instances_created = 0 # used to check the number of branches created with the Branch class (class attribute only!)
_DAMAGE_MODE_AVERAGE = "average" # class constants
_DAMAGE_MODE_ELEMENT = "element"
def __init__(self, initial_values = None, th_values: dict = None, **kwargs):
self.iv = initial_values or BranchInitialConfig()
self.th_all = th_values or th_mm # pipe & insulation thickness from data in the JSON file
self.tolerance = 0.001 # for the while loop in the <calculate_output_temperature> function
# Pipe thickness # data from the JSON file
self.th_pipe = self.th_all["th_pipe"]
# Average thickness of the damaged insulation
self.th_avg_damage = self.iv.th_avg_ins_damage_m # average insulation damage
# From **kwargs: thickness of the damaged insulation (includes type validation in utils\validation.py)
self.ins_damage_mode = kwargs.get("damage_mode", self.__class__._DAMAGE_MODE_AVERAGE) # options for how the program uses the thickness of the damaged insulation: "average" or "element"
damage = kwargs.get("damage", self.th_avg_damage if self.ins_damage_mode == self.__class__._DAMAGE_MODE_AVERAGE else [])
self.th_ins_damage_avg_m, self.th_ins_damage_elem_percent = validate_damage(self.ins_damage_mode, damage)
# Insulation thickness data
self.th_ins_supply_dict = { # insulation based on the location
PipeSectionLocation.loc1.value: self.th_all["th_insulation_channel_supply"],
PipeSectionLocation.loc2.value: self.th_all["th_insulation_surface_supply"],
PipeSectionLocation.loc3.value: self.th_all["th_insulation_soil_supply"]
}
self.th_ins_return_dict = { # insulation based on the location
PipeSectionLocation.loc1.value: self.th_all["th_insulation_channel_return"],
PipeSectionLocation.loc2.value: self.th_all["th_insulation_surface_return"],
PipeSectionLocation.loc3.value: self.th_all["th_insulation_soil_return"]
}
# From **kwargs: counting the number of instances # counts only objects created with the 'name' argument // if name is not provided, the object will not be counted
self.class_instance_name = kwargs.get("name", None)
if self.class_instance_name is not None:
self.__class__.class_instances_created += 1
#______________________ Helper methods ____________________________________
def _calculate_internal_diameter(self, d_nominal:str, d_external_m:float) -> float:
"""
Returns the pipe internal diameter for a given nominal pipe size.
Used to calculate initial values.
Returns
-------
float
Internal diameter in [m]
"""
if d_nominal not in self.th_pipe:
raise KeyError(f"Nominal pipe size {d_nominal} not found in pipe wall thickness data. Please choose a different pipe size.")
th_pipe_m = self.th_pipe[d_nominal] / 1000 # pipe thickness
d_pipe_int = d_external_m - (2 * th_pipe_m) # pipe internal diameter
return d_pipe_int
[docs]
@classmethod
def get_class_instance_count(cls) -> int: # returns the number of Branch objects created - used for tracking the number of instances created
"""
Returns the number of Branch objects created (used for tracking the number of instances created).
Returns
-------
int
Number of class instances
"""
return cls.class_instances_created
[docs]
def calculate_branch_length(self) -> float:
"""
Sums lengths of all pipeline elements to get the length of the analysed branch.
Returns
-------
float
Length in [m]
"""
l_branch = l_pipesection_supply_m.sum()
#print(f"\nBranch length: {l_branch:.2f} m\n")
return l_branch
#____________________ CALCULATIONS - SUPPLY _______________________________
[docs]
def calculate_supply(self):
"""
Performs calculations only for the supply branch.
"""
# (i) Branch setup:
th_ins_supply_dict_i = self.th_ins_supply_dict
# (ii) Initial values:
t_in_s_i_c = self.iv.t_in_supply_c # temperature at the start of the pipe section (= inlet temperature)
#
den_s_i_kg_per_m3 = calculate_fluid_density(p = self.iv.p_nominal_pa,
t = t_in_s_i_c + (- TZERO),
fluid = self.iv.fluid)
mdot_s_i_kg_per_s = self.iv.vdot_m3_per_h * den_s_i_kg_per_m3 / 3600 # mass flow at the start of the pipe section - calculated from vol. flow data
#
l_tot_s_i_m = 0 # position of the start of the section on the pipeline
qdot_loss_s_i_w = 0 # heat flow loss at the start of the pipeline
#qdotnorm_loss_i_w_per_m = 0 # normalised heat flow loss at the start of the pipeline
qdot_loss_tot_s_i_w = 0 # total heat flow loss at the start of the pipeline
#
qdot_in_tot_s_i_w = mdot_s_i_kg_per_s * (t_in_s_i_c - TZERO) * calculate_fluid_specific_heat( # total (absolute) heat flow at the start of the pipeline
p = self.iv.p_nominal_pa,
t = (t_in_s_i_c - TZERO),
fluid = self.iv.fluid)
i = 0
for i in range(len(data_input.df_supply_in)):
### PART 1: DEFINING GEOMETRY & COEFFICIENTS & THERMAL RESISTANCE FOR EACH ELEMENT
location_s_i = location_supply[i]
d_pipe_ext_s_i_m = d_pipe_ext_supply_m[i]
d_nom_s_i = str(int(d_pipe_nom_supply_mm[i])) # DN - nominal diameter
th_pipe_s_i_m = self.th_pipe[d_nom_s_i] / 1000
if th_pipe_s_i_m is None:
raise ValueError(f"No thickness found for diameter {d_pipe_ext_s_i_m} m.")
d_pipe_int_s_i_m = calculate_pipe_internal_diameter(d_pipe_ext_s_i_m, th_pipe_s_i_m) # internal pipe diameter of the selected section
h_ins2ambient_s_i_w_per_m2k = select_heat_transfer_coeff(location_s_i) # convective heat transfer coefficient depending on the location of the pipe section ---> insulation to external environment
l_s_i_m = l_pipesection_supply_m[i] # length of the section
# Calculates total thermal resistance depending on damage mode:
if self.ins_damage_mode == self.__class__._DAMAGE_MODE_AVERAGE: # for the average damage:
if th_insulation_supply_percent[i] == 0: # if the value in the input file is 0, use the average value
th_insulation_s_i_m = self.th_ins_damage_avg_m
d_insulation_ext_s_i_m = calculate_insulation_external_diameter(d_pipe_ext_s_i_m, th_insulation_s_i_m)
k_ins_s_i_w_per_mk = ThermalCoeff.k_ins_damaged_w_per_mk
else: # if value in the input file is not 0, choose insulation thickness according to pipe nominal diameter and the location of the chosen section
th_insulation_s_i_m = calculate_insulation_thickness(location_s_i, d_nom_s_i, th_ins_supply_dict_i)
d_insulation_ext_s_i_m = calculate_insulation_external_diameter(d_pipe_ext_s_i_m, th_insulation_s_i_m)
k_ins_s_i_w_per_mk = ThermalCoeff.k_ins_w_per_mk
elif self.ins_damage_mode == self.__class__._DAMAGE_MODE_ELEMENT: # for the per element damage: each element has assigned its thickness
th_insulation_s_i_percent = th_insulation_supply_percent[i] # reads the amount of insulation damage in [%]
th_insulation_s_i_m = calculate_insulation_thickness(location_s_i, d_nom_s_i, th_ins_supply_dict_i) * th_insulation_s_i_percent
d_insulation_ext_s_i_m = calculate_insulation_external_diameter(d_pipe_ext_s_i_m, th_insulation_s_i_m)
k_ins_s_i_w_per_mk = ThermalCoeff.k_ins_w_per_mk
# thermal resistance: pipe int. diam.---length---heat from water to pipe---------pipe ext. diam.-----heat through pipe-----------insulation ext. diam.---heat through ins.---heat to ambient
r_total_s_i_w_per_k = calculate_r_total(d_pipe_int_s_i_m, l_s_i_m, ThermalCoeff.h_water_w_per_m2k, d_pipe_ext_s_i_m, ThermalCoeff.k_pipe_w_per_mk, d_insulation_ext_s_i_m, k_ins_s_i_w_per_mk, h_ins2ambient_s_i_w_per_m2k)
### PART 2: HEAT FLOW LOSS CALCULATION FOR EACH ELEMENT
t_ambient_s_i_c = select_ambient_temperature(location_s_i)
cp_s_i_ws_per_kgk = calculate_fluid_specific_heat(self.iv.p_nominal_pa, (t_in_s_i_c - TZERO))
# Calculates outlet node temperature and element heat flow loss (contains: while loop)
t_out_s_i_c, qdot_loss_s_i_w = calculate_output_temperature(t_in_s_i_c, t_ambient_s_i_c, mdot_s_i_kg_per_s, cp_s_i_ws_per_kgk, r_total_s_i_w_per_k, self.tolerance)
### PART 3: OTHER CALCULATIONS
# (i) Position on the pipeline
l_tot_s_i_m += l_s_i_m
# (ii) Total heat flow loss
qdot_loss_tot_s_i_w += qdot_loss_s_i_w
# (iii) Flow velocity
den_s_i_kg_per_m3 = calculate_fluid_density(p = self.iv.p_nominal_pa, t = t_in_s_i_c + (- TZERO), fluid = self.iv.fluid)
v_s_i_m_per_s = calculate_flow_velocity(den_s_i_kg_per_m3, mdot_s_i_kg_per_s, d_pipe_int_s_i_m)
# (iv) Heat flow for the consumer - absolute value
qdot_cons_abs_s_i_w = abs(mdot_takeoff_supply_kg_per_s[i]) * cp_s_i_ws_per_kgk * (t_out_s_i_c - TZERO)
# (v) Useful heat flow for the consumer
qdot_cons_act_s_i_w = abs(mdot_takeoff_supply_kg_per_s[i]) * cp_s_i_ws_per_kgk * (t_out_s_i_c - self.iv.t_consumer_release_c)
# (vi) Total heat flow in the system
qdot_in_tot_s_i_w = qdot_in_tot_s_i_w - qdot_loss_s_i_w - qdot_cons_abs_s_i_w
# ### PART 4: WRITING CALCULATED VALUES TO THE DATAFRAMES
row_supply_i = PipeRow(
lat = lat_supply[i], # from the PipeRow class in data_output.py
lon = lon_supply[i],
l_tot = l_tot_s_i_m,
t_in = t_out_s_i_c,
mdot = mdot_s_i_kg_per_s,
qdot_loss = qdot_loss_s_i_w,
qdotnorm_loss = qdot_loss_s_i_w / l_s_i_m, # qdot loss normalised [W/m]
qdot_loss_tot = qdot_loss_tot_s_i_w,
v_fluid = v_s_i_m_per_s,
mdot_consumer = mdot_takeoff_supply_kg_per_s[i],
qdot_consumer_abs = qdot_cons_abs_s_i_w,
qdot_consumer_act = qdot_cons_act_s_i_w,
qdot_tot = qdot_in_tot_s_i_w
)
data_output.df_supply_out = pd.concat([data_output.df_supply_out, pd.DataFrame([row_supply_i.convert_to_dict_pipe()])], ignore_index = True)
### PART 5: SETTING VALUES FOR THE NEXT NODE (i --> i+1)
t_in_s_i_c = t_out_s_i_c # the inlet temperature of the next element is the same as the outlet temperature of the preceding element
mdot_s_i_kg_per_s += mdot_takeoff_supply_kg_per_s[i] # reducing the supply mass flow by the take-off amount (take-off is specified in a separate file)
#''''''''''''''''''''''' SUPPLY END '''''''''''''''''''''''''''''''''''''''
#____________________ CALCULATIONS - RETURTN ______________________________
[docs]
def calculate_return(self):
"""
Performs calculations only for the return branch.
"""
# (i) Check supply data:
if data_output.df_supply_out.empty: # checks if the calculation of the supply line has been run
raise SupplyDataMissingError("Supply DataFrame is empty. Make sure method < calculate_supply() > is run before < calculate_return() >. Supply data must be calculated before calculating return.")
# (ii) Branch setup:
th_ins_return_dict_i = self.th_ins_return_dict
# (iii) Initial values:
t_in_r_i_c = self.iv.t_in_return_c # temperature at the start of the pipe section (= inlet temperature)
#
mdot_r_i_kg_per_s = data_output.df_supply_out["mdot [kg/s]"].iloc[-1] # last value in the dataframe ---> mass flow at the start of the pipe section - calculated with the mass flow of the last node on the supply line (assumption: entire flow from the supply line goes to the return line)
#
#l_tot_r_i_m = 0 # position of the start of the section on the pipeline
l_tot_r_i_m = self.calculate_branch_length()
qdot_loss_r_i_w = 0 # heat flow loss at the start of the pipeline
#qdotnorm_loss_r_i_w_per_m = 0 # normalised heat flow loss at the start of the pipeline
qdot_loss_tot_r_i_w = 0 # total heat flow loss at the start of the pipeline
#
qdot_in_tot_r_i_w = mdot_r_i_kg_per_s * (t_in_r_i_c - TZERO) * calculate_fluid_specific_heat( # total (absolute) heat flow at the start of the return pipeline
p = self.iv.p_nominal_pa,
t = (t_in_r_i_c - TZERO),
fluid = self.iv.fluid)
i = 0 # for the for loop ---> loops through all elements of the return line
j = len(data_input.df_supply_in) - 1 # for the if loop for the consumer in the for loop ---> to determine the return flow from the consumer
for i in range(len(data_input.df_return_in)):
### PART 1: DEFINING GEOMETRY & COEFFICIENTS & THERMAL RESISTANCE FOR EACH ELEMENT
location_r_i = location_return[i]
d_pipe_ext_r_i_m = d_pipe_ext_return_m[i]
d_nom_r_i = str(int(d_pipe_nom_return_mm[i])) # DN - nominal diameter
th_pipe_r_i_m = self.th_pipe[d_nom_r_i] / 1000
if th_pipe_r_i_m is None:
raise ValueError(f"No thickness found for diameter {d_pipe_ext_r_i_m} m.")
d_pipe_int_r_i_m = calculate_pipe_internal_diameter(d_pipe_ext_r_i_m, th_pipe_r_i_m) # internal pipe diameter of the selected section
h_ins2ambient_r_i_w_per_m2k = select_heat_transfer_coeff(location_r_i) # convective heat transfer coefficient depending on the location of the pipe section ---> insulation to external environment
l_r_i_m = l_pipesection_return_m[i] # length of the section
# Calculates total thermal resistance depending on damage mode:
if self.ins_damage_mode == self.__class__._DAMAGE_MODE_AVERAGE: # for the average damage:
if th_insulation_return_percent[i] == 0: # if the value in the input file is 0, use the average value
th_insulation_r_i_m = self.th_ins_damage_avg_m
d_insulation_ext_r_i_m = calculate_insulation_external_diameter(d_pipe_ext_r_i_m, th_insulation_r_i_m)
k_ins_r_i_w_per_mk = ThermalCoeff.k_ins_damaged_w_per_mk
else: # if value in the input file is not 0, choose insulation thickness according to pipe nominal diameter and the location of the chosen section
th_insulation_r_i_m = calculate_insulation_thickness(location_r_i, d_nom_r_i, th_ins_return_dict_i)
d_insulation_ext_r_i_m = calculate_insulation_external_diameter(d_pipe_ext_r_i_m, th_insulation_r_i_m)
k_ins_r_i_w_per_mk = ThermalCoeff.k_ins_w_per_mk
elif self.ins_damage_mode == self.__class__._DAMAGE_MODE_ELEMENT: # for the per element damage: each element has assigned its thickness
th_insulation_r_i_percent = th_insulation_return_percent[i] # reads the amount of insulation damage in [%]
th_insulation_r_i_m = calculate_insulation_thickness(location_r_i, d_nom_r_i, th_ins_return_dict_i) * th_insulation_r_i_percent
d_insulation_ext_r_i_m = calculate_insulation_external_diameter(d_pipe_ext_r_i_m, th_insulation_r_i_m)
k_ins_r_i_w_per_mk = ThermalCoeff.k_ins_w_per_mk
# thermal resistance: pipe int. diam.---length---heat from water to pipe---------pipe ext. diam.---heat through pipe-------------insulation ext. diam.---heat through ins.---heat to ambient
r_total_r_i_w_per_k = calculate_r_total(d_pipe_int_r_i_m, l_r_i_m, ThermalCoeff.h_water_w_per_m2k, d_pipe_ext_r_i_m, ThermalCoeff.k_pipe_w_per_mk, d_insulation_ext_r_i_m, k_ins_r_i_w_per_mk, h_ins2ambient_r_i_w_per_m2k)
### PART 2: HEAT FLOW LOSS CALCULATION FOR EACH ELEMENT
t_ambient_r_i_c = select_ambient_temperature(location_r_i)
cp_r_i_ws_per_kgk = calculate_fluid_specific_heat(self.iv.p_nominal_pa, (t_in_r_i_c - TZERO))
# Calculates outlet node temperature and element heat flow loss (contains: while loop)
t_out_r_i_c, qdot_loss_r_i_w = calculate_output_temperature(t_in_r_i_c, t_ambient_r_i_c, mdot_r_i_kg_per_s, cp_r_i_ws_per_kgk, r_total_r_i_w_per_k, self.tolerance)
# Connecting the correct sections of the supply and return lines, because they may not have the same number of elements (non-symmetrical pipelines)
if data_input.df_return_in["mdot take-off [kg/s]"][i] != 0:
while data_input.df_supply_in["mdot take-off [kg/s]"][j] == 0:
j -= 1
#
mdot_consumer_r_i_kg_per_s = - data_output.df_supply_out["mdot consumer [kg/s]"][j]
j -= 1
else:
mdot_consumer_r_i_kg_per_s = 0
# Temperature of the mixture on the return line: fluid from the consumer mixes with the fluid in the return line ---> residual heat flows from the consumer into the return line
t_mix_r_i_c = ((t_out_r_i_c * mdot_r_i_kg_per_s) + (self.iv.t_consumer_release_c * mdot_consumer_r_i_kg_per_s)) / (mdot_r_i_kg_per_s + mdot_consumer_r_i_kg_per_s)
### PART 3: OTHER CALCULATIONS
# (i) Position on the pipeline
l_tot_r_i_m -= l_r_i_m
# (ii) Total heat flow loss
qdot_loss_tot_r_i_w += qdot_loss_r_i_w
# (iii) Flow velocity
den_r_i_kg_per_m3 = calculate_fluid_density(p = self.iv.p_nominal_pa, t = t_in_r_i_c + (- TZERO), fluid = self.iv.fluid)
v_r_i_m_per_s = calculate_flow_velocity(den_r_i_kg_per_m3, mdot_r_i_kg_per_s, d_pipe_int_r_i_m)
# (iv) Heat flow for the consumer - absolute value
qdot_cons_abs_r_i_w = abs(mdot_takeoff_return_kg_per_s[i]) * cp_r_i_ws_per_kgk * (t_out_r_i_c - TZERO)
# (v) Useful heat flow for the consumer
qdot_cons_act_r_i_w = abs(mdot_takeoff_return_kg_per_s[i]) * cp_r_i_ws_per_kgk * (t_out_r_i_c - self.iv.t_consumer_release_c)
# (vi) Total heat flow in the system
qdot_in_tot_r_i_w = qdot_in_tot_r_i_w - qdot_loss_r_i_w + qdot_cons_abs_r_i_w
# ### PART 4: WRITING CALCULATED VALUES TO THE DATAFRAMES
row_return_i = PipeRow(
lat = lat_return[i], # from the PipeRow class in data_output.py
lon = lon_return[i],
l_tot = l_tot_r_i_m,
t_in = t_out_r_i_c,
mdot = mdot_r_i_kg_per_s,
qdot_loss = qdot_loss_r_i_w,
qdotnorm_loss = qdot_loss_r_i_w / l_r_i_m, # qdot loss normalised [W/m]
qdot_loss_tot = qdot_loss_tot_r_i_w,
v_fluid = v_r_i_m_per_s,
mdot_consumer = mdot_consumer_r_i_kg_per_s,
qdot_consumer_abs = qdot_cons_abs_r_i_w,
qdot_consumer_act = qdot_cons_act_r_i_w,
qdot_tot = qdot_in_tot_r_i_w
)
data_output.df_return_out = pd.concat([data_output.df_return_out, pd.DataFrame([row_return_i.convert_to_dict_pipe()])], ignore_index = True)
### PART 5: SETTING VALUES FOR THE NEXT NODE (i --> i+1)
t_in_r_i_c = t_mix_r_i_c # the inlet temperature of the next element is the same as the outlet temperature of the preceding element
mdot_r_i_kg_per_s += mdot_consumer_r_i_kg_per_s # reducing the supply mass flow by the take-off amount (take-off is specified in a separate file)
#''''''''''''''''''''''' RETURN END '''''''''''''''''''''''''''''''''''''''
#____________________ CALCULATIONS - SYSTEM _______________________________
[docs]
def calculate_system_heat_flow(self) -> None:
"""
Gathers total heat flow data from the supply and return calculations to calculate the total system heat flow:
Q̇ _total_system = Q̇ _total_supply - Q̇ _total_return.
"""
i = 0
j = 0
# Determine the shorter dataframe
if data_output.df_supply_out.shape[0] <= data_output.df_return_out.shape[0]:
length_qdot_sys = len(data_output.df_supply_out)
elif data_output.df_return_out.shape[0] < data_output.df_supply_out.shape[0]:
length_qdot_sys = len(data_output.df_return_out)
df_length_diff = len(data_output.df_supply_out) - len(data_output.df_return_out)
if df_length_diff > 0:
diff = 0
elif df_length_diff <= 0:
diff = abs(df_length_diff)
for i in range(len(data_output.df_supply_out)):
if (data_input.mdot_takeoff_return_kg_per_s[length_qdot_sys - i] != 0) or (i == length_qdot_sys):
qdot_sys_supply_i = data_output.df_supply_out["Qdot tot [W]"][j]/1e6
qdot_sys_return_i = data_output.df_return_out["Qdot tot [W]"][len(data_output.df_return_out) - diff - i]/1e6
qdot_system_i_w = qdot_sys_supply_i - qdot_sys_return_i
j += 1
#print(f"{i} \t {lat_supply[i]} \t {lon_supply[i]} \t {data_output.df_supply_out["L tot [m]"][i]} \t {qdot_system_i_w}")
row_system_i = SystemRow(
lat = lat_supply[i],
lon = lon_supply[i],
l_tot = data_output.df_supply_out["L tot [m]"][i],
qdot_system = qdot_system_i_w
)
data_output.df_system_out = pd.concat([data_output.df_system_out, pd.DataFrame([row_system_i.convert_to_dict_system()])], ignore_index = True)
#''''''''''''''''''''''' SYSTEM END '''''''''''''''''''''''''''''''''''''''