WeatherRoutingTool.constraints package

Submodules

WeatherRoutingTool.constraints.constraints module

class WeatherRoutingTool.constraints.constraints.Constraint(name)[source]

Bases: object

Main class for handling of constraints. Constraints implemented so far: LandCrossing (prohibit land crossing), WaterDepth (prohibit crossing of areas with too low water depth), StayOnMap (prohibit leaving the area for which the weather data has been obtained)

constraint_on_point(lat, lon, time)[source]
lat: ndarray
lon: ndarray
message: str
name: str
plot_constraint()[source]
plot_route_in_constraint(route: RouteParams, colour, fig, ax)[source]
print_constraint_message()[source]
print_debug(message)[source]
print_info()[source]
time: ndarray
class WeatherRoutingTool.constraints.constraints.ConstraintPars[source]

Bases: object

Class that initialises ConstraintList

bCheckCrossing: bool
bCheckEndPoints: bool
print()[source]
resolution: int
class WeatherRoutingTool.constraints.constraints.ConstraintsList(pars)[source]

Bases: object

List of constraints. During the routing procedure, you can check for ConstraintList.safe_crossing(lat_start, lat_end, lon_start, lon_end, time) which looks for constraints in between starting point and destination. Alternatively it can also be checked for a single point whether a constraint is hit via ConstraintList.safe_endpoint(lat, lon, time).

add_neg_constraint(constraint, option='discrete')[source]
add_pos_constraint(constraint)[source]
check_weather()[source]
constraints_crossed: list
current_positive: int
get_current_destination()[source]
get_current_start()[source]
have_negative()[source]
have_positive()[source]
init_positive_lists(start, finish)[source]
neg_cont_size: int
neg_dis_size: int
negative_constraints_continuous: list
negative_constraints_discrete: list
pars: ConstraintPars
pos_size: int
positive_constraints: list
positive_point_dict: dict
print_active_constraints()[source]
print_constraints_crossed()[source]
print_settings()[source]
reached_positive()[source]
safe_crossing(lat_start, lon_start, lat_end, lon_end, current_time, is_constrained)[source]
safe_crossing_continuous(lat_start, lon_start, lat_end, lon_end, is_constrained)[source]

TODO: add description _summary_

Parameters:
  • lat_start (ndarray or float) – Latitude of start point of section to check

  • lon_start (numpy.ndarray or float) – Longitude of start point of section to check

  • lat_end (numpy.ndarray or float) – Latitude of end point of section to check

  • lon_end (numpy.ndarray or float) – Longitude of end point of section to check

  • is_constrained (list[bool]) – List of booleans for every constraint stating if the section is constraint by it

Returns:

is_constrained.tolist()

Return type:

list[bool]

safe_crossing_discrete(lat_start, lon_start, lat_end, lon_end, current_time, is_constrained)[source]

Check whether there is a constraint on the way from a starting point (lat_start, lon_start) to the destination (lat_end, lon_end). To do so, the code segments the travel distance into steps (step length given by ConstraintPars.resolution) and loops through all these steps calling ConstraintList.safe_endpoint())

Parameters:
  • lat_start (numpy.ndarray or float) – Latitude of start point of section to check

  • lon_start (numpy.ndarray or float) – Longitude of start point of section to check

  • lat_end (numpy.ndarray or float) – Latitude of end point of section to check

  • lon_end (numpy.ndarray or float) – Longitude of end point of section to check

:param current_time:Time of point to check :type current_time: datetime :param is_constrained: List of booleans for every constraint stating if the section is constraint by it :type is_constrained: list[bool] :return: is_constrained.tolist() :rtype: list[bool]

safe_endpoint(lat, lon, current_time, is_constrained)[source]

Check whether there is a constraint on the space-time point defined by lat, lon, time. To do so, the code loops over all Constraints added to the ConstraintList

Parameters:
  • lat (numpy.ndarray or float) – Latitude of point to check

  • lon (numpy.ndarray or float) – Longitude of point to check

  • current_time (datetime) – Time of point to check

  • is_constrained (list[bool]) – List of booleans for every constraint stating if the point is constraint by it

Returns:

is_constrained

Return type:

list[bool]

shall_I_pass(lat, lon, time)[source]
split_route()[source]
weather: WeatherCond
class WeatherRoutingTool.constraints.constraints.ConstraintsListFactory[source]

Bases: object

Initialise ConstraintList object and add all constraints that shall be considered

static get_constraints_list(constraints_string_list, **kwargs)[source]
class WeatherRoutingTool.constraints.constraints.ContinuousCheck(db_engine=None)[source]

Bases: NegativeContraint

Contains various functions to test data connection, gathering and use for obtaining spatial relations for the continuous check in the negative constraints

connect_database()[source]

Connect to the database

database: str
engine: <module 'sqlalchemy.engine' from '/opt/hostedtoolcache/Python/3.13.6/x64/lib/python3.13/site-packages/sqlalchemy/engine/__init__.py'>
host: str
password: str
port: str
predicates: list
print_info()[source]
set_map_bbox(map_size)[source]
tags: list
user: str
class WeatherRoutingTool.constraints.constraints.LandCrossing[source]

Bases: NegativeContraint

Constraint such that the boat cannot cross land

constraint_on_point(lat, lon, time)[source]
print_info()[source]
class WeatherRoutingTool.constraints.constraints.LandPolygonsCrossing(map_size=None, db_engine=None)[source]

Bases: ContinuousCheck

Use the ‘LandPolygonsCrossing’ constraint cautiously. This class is yet to be tested.

build_landpolygon_query(map_size)[source]
check_crossing(lat_start, lon_start, lat_end, lon_end)[source]

Check if certain route crosses specified seamark objects

Parameters:
  • lat_start (numpy.ndarray) – array of all origin latitudes of routing segments

  • lon_start (numpy.ndarray) – array of all origin longitudes of routing segments

  • lat_end (numpy.ndarray) – array of all destination latitudes of routing segments

  • lon_end (numpy.ndarray) – array of all destination longitudes of routing segments

Returns:

list of spatial relation result (True or False)

Return type:

list[bool]

land_polygon_STRTree = None
query_land_polygons(db_engine, query)[source]

Create new GeoDataFrame using public.ways table in the query

Parameters:
  • engine (sqlalchemy.engine.Engine) – sqlalchemy engine

  • query (str) – sql query for table ways

Returns:

gdf including all the features from public.ways table

Return type:

geopandas.GeoDataFrame

set_landpolygon_STRTree(db_engine=None, query=None)[source]
class WeatherRoutingTool.constraints.constraints.NegativeConstraintFromWeather(name, weather)[source]

Bases: NegativeContraint

Negative constraint which needs information from the weather (this includes depth information which are stored in the netCDF weather file)

check_weather(lat, lon, time)[source]
wt: WeatherCond
class WeatherRoutingTool.constraints.constraints.NegativeContraint(name)[source]

Bases: Constraint

Handling of constraints where the ship MUST NOT pass a certain area (too low water depth, too high waves, danger areas…)

class WeatherRoutingTool.constraints.constraints.PositiveConstraint(name)[source]

Bases: Constraint

Handling of constraints where the ship NEEDS to take a certain route (e.g. waterways)

get_points()[source]
class WeatherRoutingTool.constraints.constraints.PositiveConstraintPoint(lat, lon)[source]

Bases: PositiveConstraint

coord: tuple
get_points()[source]
print_info()[source]
class WeatherRoutingTool.constraints.constraints.RunTestContinuousChecks(test_dict)[source]

Bases: ContinuousCheck

check_crossing(lat_start, lon_start, lat_end, lon_end, time=None)[source]
connect_database()[source]

Connect to the database

print_info()[source]
class WeatherRoutingTool.constraints.constraints.SeamarkCrossing(is_stay_on_map=None, map_size=None, db_engine=None)[source]

Bases: ContinuousCheck

Contains various functions to test data connection, gathering and use for obtaining spatial relations for the continuous check in the negative constraints

build_seamark_query(is_stay_on_map=None, map_size=None)[source]

TODO: add description _summary_

Parameters:
  • is_stay_on_map (_type_, optional) – _description_, defaults to None

  • map_size (_type_, optional) – _description_, defaults to None

Returns:

_description_

Return type:

_type_

check_crossing(lat_start, lon_start, lat_end, lon_end)[source]

Check if certain route crosses specified seamark objects

Parameters:
  • lat_start (numpy.ndarray) – array of all origin latitudes of routing segments

  • lon_start (numpy.ndarray) – array of all origin longitudes of routing segments

  • lat_end (numpy.ndarray) – array of all destination latitudes of routing segments

  • lon_end (numpy.ndarray) – array of all destination longitudes of routing segments

Returns:

list of spatial relation result (True or False)

Return type:

list[bool]

concat_nodes_ways(db_engine, query)[source]

Create new GeoDataFrame using public.ways and public.nodes table together in the query

Parameters:
  • query (list) – sql query for table ways

  • engine (sqlalchemy.engine.Engine) – sqlalchemy engine

Returns:

gdf including all the features from public.ways and public.nodes table

Return type:

geopandas.GeoDataFrame

concat_tree: STRtree
database: str
host: str
password: str
port: str
predicates: list
query_nodes(db_engine, query=None)[source]

Create new GeoDataFrame using public.nodes table in the query

Parameters:
  • db_engine (sqlalchemy.engine.Engine) – sqlalchemy engine

  • query (str, optional) – sql query for table nodes, defaults to None

Returns:

gdf including all the features from public.nodes table

Return type:

GeoDataFrame

query_ways(db_engine, query)[source]

Create new GeoDataFrame using public.nodes table in the query

Parameters:
  • db_engine (sqlalchemy.engine.Engine) – sqlalchemy engine

  • query (str, optional) – sql query for table ways, defaults to None

Returns:

gdf including all the features from public.nodes table

Return type:

GeoDataFrame

set_STRTree(db_engine=None, query=None)[source]
tags: list
user: str
class WeatherRoutingTool.constraints.constraints.StatusCodeError(courses_path)[source]

Bases: NegativeContraint

Negative constraint for points where mariPower returns a status code of 3 (=error). FIXME: currently, this constraint is added as ‘continuous’ constraint, but a ‘discrete’ constraint would

be more suitable/intuitive. However, this cannot be used at the moment because discrete constraints are checked on intermediate points between two consecutive routing points and the status code is not available for these.

check_crossing(lat_start=None, lon_start=None, lat_end=None, lon_end=None, current_time=None)[source]
courses_path: str
load_data_from_file(courses_path)[source]
class WeatherRoutingTool.constraints.constraints.StayOnMap[source]

Bases: NegativeContraint

Constraint such that the boat can’t leave the map that has weather data available

constraint_on_point(lat, lon, time)[source]
lat1: float
lat2: float
lon1: float
lon2: float
print_info()[source]
set_map(lat1, lon1, lat2, lon2)[source]
class WeatherRoutingTool.constraints.constraints.WaterDepth(data_mode, min_depth, map_size, depth_path='')[source]

Bases: NegativeContraint

Constraint such that the boat can’t cross over a minimal water depth

check_depth(lat, lon, time)[source]
constraint_on_point(lat, lon, time)[source]
current_depth: ndarray
depth_data: <module 'xarray' from '/opt/hostedtoolcache/Python/3.13.6/x64/lib/python3.13/site-packages/xarray/__init__.py'>
get_current_depth(lat, lon)[source]
load_data_ODC(depth_path, product_name, measurements=None)[source]

Load depth data from ODC

Parameters:
  • depth_path (str) – Path to where depth data should be written to

  • product_name (str) – Name of the ODC dataset

  • measurements (list[str], optional) – A list of band names to load from the product_name dataset, defaults to None

Raises:
  • ValueError – product_name is not known on ODC

  • KeyError – measurement in measurements is not known on ODC

Returns:

Depth data loaded from ODC

Return type:

xarray.Dataset

load_data_automatic(depth_path)[source]

Load data from NCEI

Parameters:

depth_path (str) – Path to where depth data should be written to

Returns:

Depth data loaded from NCEI

Return type:

xarray.Dataset

load_data_from_file(depth_path)[source]

Load depth data from given file

Parameters:

depth_path (str) – Path to the depth data

Returns:

Depth data loaded from file

Return type:

xarray.Dataset

map_size: Map
min_depth: float
plot_constraint(fig, ax)[source]
plot_depth_map_from_file(path)[source]
print_info()[source]
set_draught(depth)[source]
class WeatherRoutingTool.constraints.constraints.WaveHeight[source]

Bases: NegativeConstraintFromWeather

Constraint such that the boat can’t cross a certain wave hight

constraint_on_point(lat, lon, time)[source]
current_wave_height: ndarray
max_wave_height: float
print_info()[source]

WeatherRoutingTool.constraints.route_postprocessing module

class WeatherRoutingTool.constraints.route_postprocessing.RoutePostprocessing(min_fuel_route, boat, db_engine=None)[source]

Bases: object

Currently RoutePostprocessing is focused on Traffic Separation Scheme. In the future, it should be integrated into a more general approach.

boat: Boat
calculate_angle_from_slope(s1, s2)[source]

Calculate angle between two lines when the slopes of the two lines are given.

Parameters:
  • s1

  • s2

Type:

s1: numeric

Type:

s2: numeric

Returns:

angle

Return type:

numeric

calculate_angle_of_current_crossing(start_node, end_node, separation_lane_gdf, intersecting_route_seg_geom)[source]

Calculate the angle between the straight line which starts from the point before first intersection and ends after the last intersection with respect to the intersecting separation lane segment. Separation lane can contain multiple line segments.

Parameters:
  • start_node (shapely.Point) – starting node of the straight line

  • end_node (shapely.Point) – ending node of the straight line

  • separation_lane_gdf (geopandas.GeoDataFrame) – separation lanes

  • intersecting_route_seg_geom (shapely.LineString) – route segment which is intersecting the separation lane

Returns:

Return type:

tuple

calculate_slope(x1, y1, x2, y2)[source]

Calculate the slope of a line from the two given points on the same line.

Parameters:
  • x1

  • y1

  • x2

  • y2

Type:

x1: numeric

Type:

y1: numeric

Type:

x2: numeric

Type:

y2: numeric

Returns:

slope

Return type:

numeric

calculate_timsestamp(lat, lon, start_times, node_index, speed)[source]

Calculate the time taken using time = distance / ship speed of the previous route segment and then added the new time taken into previous timestamp to get the new start time.

Parameters:
  • lat (numpy.ndarray)

  • lon (numpy.ndarray)

  • start_times (list[datetime.datetime])

  • node_index (int)

  • speed (float)

Returns:

time stamps

Return type:

datetime.datetime

check_valid_crossing(separation_lanes_data_gdf, last_node_of_first_route_seg, first_node_of_last_route_seg)[source]

This checks whether the straight line starting from the point before the intersection and ending from the point after the last intersection makes an angle between 60 and 120 with respect to the nearest separation lane of the starting point.

Parameters:
  • separation_lanes_data_gdf (geopandas.GeoDataFrame)

  • last_node_of_first_route_seg (shapely.Point)

  • first_node_of_last_route_seg (shapely.Point)

Returns:

Return type:

tuple(bool, shapely.LineString)

connect_database()[source]
Returns:

database engine

Return type:

sqlalchemy.engine.Engine

connect_route_segments(first_route_seg_gdf, separation_lane_gdf, last_route_seg_gdf, route_segments_gdf=None)[source]

_summary_

Parameters:
  • first_route_seg_gdf (geopandas.GeoDataFrame)

  • separation_lane_gdf (geopandas.GeoDataFrame)

  • last_route_seg_gdf (geopandas.GeoDataFrame)

  • route_segments_gdf (geopandas.GeoDataFrame)

Returns:

geodataframe of the final route

Return type:

geopandas.GeoDataFrame

create_first_connecting_seg(first_route_seg_gdf, separation_lane_gdf)[source]

Build the LineString between the route node before the first intersection and the first node of the separation lane segment

Parameters:
  • first_route_seg_gdf (geopandas.GeoDataFrame)

  • separation_lane_gdf (geopandas.GeoDataFrame)

Returns:

Return type:

shapely.LineString

create_first_connecting_seg_from_node(route_segment_gdf, separation_lane_gdf)[source]
Parameters:
  • route_segment_gdf (geopandas.GeoDataFrame)

  • separation_lane_gdf (geopandas.GeoDataFrame)

Returns:

Return type:

shapely.LineString

create_last_connecting_line(last_route_seg_gdf, separation_lane_gdf)[source]

Build the LineString between the last node of the separtion lane and the first route node after the last intersection.

Parameters:
  • last_route_seg_gdf (geopandas.GeoDataFrame)

  • separation_lane_gdf (geopandas.GeoDataFrame)

Returns:

Return type:

shapely.LineString

create_last_connecting_line_from_node(route_segment_gdf, separation_lane_gdf)[source]

Build the LineString between the last node of the separtion lane and the first route node after the last intersection.

Parameters:
  • route_segment_gdf (geopandas.GeoDataFrame)

  • separation_lane_gdf (geopandas.GeoDataFrame)

Returns:

Return type:

shapely.LineString

create_route_segments()[source]

Create relevant line segments from the nodes(coordinates) of the minimum fuel route

find_first_node_of_route_seg(route_seg_gdf)[source]

Find the node of the route segments after the last intersecting point

find_last_node_of_route_seg(first_route_seg_gdf)[source]

Find the node of the route segments before the first intersecting point

find_point_from_perpendicular_angle(start_node, segment)[source]

Find the intersecting point on a line segment which it makes a perpendicular angle from a given point.

Parameters:
  • start_node (shapely.Point) – given point

  • segment – given line segment

Returns:

coordinates of the point on the line which makes a right angle from the given point

Return type:

tuple

find_seamark_intersections(route_segments_gdf, seamark_gdf)[source]

First, find intersection points of route segments with any seamark object. If any intersection is available, then iterate over each route segment and the intersection points to identify which route segment is getting intersected with the intersection point. Then, the relevant route segment is added to the list of indices of intersected route segments.

Parameters:
  • route_segments_gdf (geopandas.GeoDataFrame) – geodataframe of ship route segments

  • seamark_gdf (geopandas.GeoDataFrame) – geodataframe of all seamark TSS objects within the bounding box of the route

Returns:

The list of route segment indices which intersects the TSS seamark objects

Return type:

list

find_seperation_lane_to_follow(last_node, seperation_lanes_gdf)[source]

The direction of the separation lane is determined by the order of the points in the linestring. Hence, it is assumed the nearest starting point of the separation lane (when in parallel and opposite directions) need to be followed. This is achieved by finding the distances between the route node before the first intersection and the starting node of the each separation lane. Then the separation lane having the minimum distance is selected.

Parameters:
  • last_node (shapely.Point) – the node of the route segments before the first intersecting point

  • seperation_lanes_gdf (geopandas.GeoDataFrame) – geodataframe of separation lanes

Returns:

geodataframe of the separation lane need to be followed

Return type:

geopandas.GeoDataFrame

get_route_bbox()[source]
is_start_or_finish_node_in_separation_zone(route_segments_gdf, seamark_gdf)[source]

Find whether seamark TSS objects contains the starting or ending node of the route. If contains, the route is not forwarded for postprocessing.

Parameters:
  • route_segments_gdf (geopandas.GeoDataFrame) – geodataframe of ship route segments

  • seamark_gdf (geopandas.GeoDataFrame) – geodataframe of all seamark TSS objects within the bounding box of the route

Returns:

Return type:

bool

lats_per_step: list
lons_per_step: list
post_process_route()[source]

TODO: add class description _summary_

query_data(query, engine)[source]
Parameters:
  • query (str) – database query

  • engine (sqlalchemy.engine.Engine) – sqlalchemy engine

Returns:

dataframe

Return type:

geopandas.GeoDataFrame

recalculate_starttime_per_node(final_route)[source]

To recalculate the start time of the new route segments, first a new integer index is set to final route segments dataframe. Then, the index of first Not available Timestamp value is searched and start calculating the new time taken from that index to rest of the dataframe.

Parameters:

final_route (geopandas.GeoDataFrame)

Returns:

start times

Return type:

list[datetime.datetime]

retrieve_seamark_data(bbox_wkt, engine)[source]

Retrieve all the seamark objects within the bounding box

Parameters:
  • bbox_wkt (str) – bounding box wkt

  • engine (sqlalchemy.engine.Engine) – sqlalchemy engine

Returns:

dataframe

Return type:

geopandas.GeoDataFrame

retrieve_seperation_lane_data(bbox_wkt, engine)[source]
route: RouteParams
set_data(route, boat)[source]
ship_speed: list
starttime_per_step: list
terminate(route_lons, route_lats, starttime_list, boat_speed)[source]

Find the courses from route_dict to calculate the ship_parameters

Parameters:
  • route_lons (numpy.ndarray)

  • route_lats (numpy.ndarray)

  • starttime_list (list[datetime.datetime])

  • boat_speed (float)

Returns:

final route

Return type:

RouteParams

Module contents