import geopandas as gpd
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
import scipy
import math
import numpy as np
from geopy.distance import geodesic as GD
from shapely.geometry import box
from shapely.geometry import LineString, Point
import shapely
from collections import defaultdict
import folium
from IPython.display import display, HTML
import random
[docs]
def convert_to_points(coord_list):
"""Convert coordinate pairs into Shapely Point object.
Args:
coord_list(list): coordinate pairs in (lon, lat) format.
Returns:
list: coordinates-converted Shapely points.
"""
return [Point(coord) for coord in coord_list]
[docs]
def process_data(df):
"""Convert list-formatted trajectories to individal Shapely Point.
Args:
df(pd.DataFrame): an object that contains at least a "geometry" column.
Returns:
gpd.GeoDataFrame: an object compliant with WGS84 reference system, ie. (lon, lat) pairs.
"""
tqdm.pandas()
df['geometry'] = df['geometry'].progress_apply(convert_to_points)
df_points = df.explode('geometry')
gdf = gpd.GeoDataFrame(df_points, geometry='geometry',crs="EPSG:4326")
return gdf
[docs]
class ConvertToToken:
def __init__(self, df, area, cell_size):
"""Initialize a class object.
Args:
df(dataframe): an object containing at least a 'geometry' column,
with each row being a list of coordinate pairs in (lon, lat) format.
area(gpd.GeoDataFrame): Shapely polygon delimiting the boundary of a geographical region.
cell_size(int): side length of a square cell in an area grid.
"""
self.cell_size = cell_size
self.gdf = process_data(df)
self.area = area
[docs]
def create_grid(self):
"""Creates a grid of cell size 'n' over a given area.
Generates a regular grid of cells with the specified cell size (in meters)
covering the entire bounding box of the study area. The grid cells are
created as Shapely box geometries and stored in a GeoDataFrame.
This method converts the cell size from meters to degrees based on the
geographic location, accounting for the Earth's curvature.
Returns:
tuple: A tuple containing:
- cell(gpd.GeoDataFrame): object with grid cells as box
geometries in the 'geometry' column. CRS is EPSG:4326.
- n_rows(int): number of rows in the grid.
- cell.shape[0]: total number of cells created.
"""
# Geographical boundary delimited by (min_lon, min_lat, max_lon, max_lat)
xmin, ymin, xmax, ymax = self.area.total_bounds
# Calculate distance between two coordinate points of [lat, lon] in meter
height = GD((ymin, xmax), (ymax, xmax)).m
width = GD((ymin, xmin), (ymin, xmax)).m
# how many cells across and down
grid_cells = []
# Compute number of cells along height
n_cells_h = height / self.cell_size
# Convert cell back to degree unit
cell_size_h = (ymax - ymin) / n_cells_h
n_cells_w = width / self.cell_size
cell_size_w = (xmax - xmin) / n_cells_w
for x0 in np.arange(xmin, xmax, cell_size_w):
n_rows = 0
for y0 in np.arange(ymin, ymax, cell_size_h):
# bounds
x1 = x0 + cell_size_w
y1 = y0 + cell_size_h
grid_cells.append(shapely.geometry.box(x0, y0, x1, y1))
n_rows += 1
# print('n_rows ', n_rows)
cell = gpd.GeoDataFrame(grid_cells, columns=['geometry'], crs="EPSG:4326")
print('Number of created cells: ', cell.shape[0])
return cell, n_rows, cell.shape[0]
[docs]
def assign_ids(self, grid, n_rows):
"""Assign each cell an unique ID.
Assignes each grid cell a unique identifier based on its position in the grid.
IDs are tuples of (column_index, row_index) starting from 0. The assignment follows
column-major order.
Args:
grid(gpd.GeoDataFrame): area grid returned from create_grid()
n_rows: number of rows in the grid
Returns:
grid(gpd.GeoDataFrame): The input object with an additional "ID" column where each row
contains a tuple of (col_index, row_index) for each cell.
"""
total = grid.shape[0]
n_cols = int(total / n_rows)
tuple_list = []
for i in range(n_cols):
for j in range(n_rows):
tuple_list.append(tuple((i, j)))
grid['ID'] = tuple_list
return grid
[docs]
def find_grid_center(self, grid):
"""Finds the centroid of each cell in the grid
Calculates the geometric center point of each grid cell. It first projects the grid
to a flat plane using EPSG:3857 reference system for accurate geometric calculation,
then converts the result back to EPSG:4326 system to maintain consistency with the
original reference system.
Args:
grid(gpd.GeoDataFrame): an object with cell geometry and ID columns.
Returns:
grid_center(gpd.GeoDataFrame): a new object with "geometry" and "ID" columns. The former
now represents a cell box with its centroid.
"""
grid_center = gpd.GeoDataFrame(columns=["geometry", "ID"], geometry='geometry', crs="EPSG:4326")
grid_projected = grid.to_crs("EPSG:3857")
centroids = grid_projected.centroid
centroids_4326 = centroids.to_crs("EPSG:4326")
grid_center['geometry'] = list(centroids_4326)
grid_center["ID"] = grid["ID"]
return grid_center
[docs]
def merge_with_polygon(self, grid):
"""Performs spatial joins between trajectory points and grid cells.
Assigns each trajectory point to its corresponding grid cell using a spatial join
operation. Points are matched to grid cells based on which cell polygon they fall
within. Points that don't fall within any grid cell are removed from the result
Args:
grid(gpd.GeoDataFrame): an object with cell geometry and ID columns.
Returns:
merged_df(gpd.GeoDataFrame): the trajectory points GeoDataFrame with additional "ID"
column containing grid cell ID where each point is located.
"""
# Include coords right on edge of grid by setting predicate to intersects
merged_gdf = gpd.sjoin(self.gdf, grid, how='left', predicate='within')
merged_gdf.drop(columns=['index_right'], inplace=True)
# Drop any rows with 'nan' values in 'ID' column
merged_gdf = merged_gdf.dropna(subset=['ID'])
return merged_gdf
[docs]
def create_tokens(self):
"""Convert raw coordinate pairs into tokens of (row_id, col_id).
Creates a grid over a given area where trajectories are sourced, assign unique IDs
to cells in the grid, compute cell centers and merge original coordinates with their
corresponding cell IDs based on which cell they fall into.
Returns:
tuple: A tuple containing:
- grid_center(gpd.GeoDataFrame): object containing a "geometry" and "ID" column, with
the former representing a cell by its centroid.
- grouped_df(pd.DataFrame): object containing three columns -- "trip_id", "geometry"
and "ID". "geometry" represents a trajectory with a sequence of Point objects.
"""
grid, n_rows, num_cells = self.create_grid()
assigned_grid = self.assign_ids(grid, n_rows)
grid_center = self.find_grid_center(assigned_grid)
merged_gdf = self.merge_with_polygon(grid)
agg_funcs = {'geometry': list, 'ID':list}
grouped_df = merged_gdf.groupby('trip_id').agg(agg_funcs)
sentences = grouped_df['ID'].tolist()
sentences = [[x for i, x in enumerate(lst) if i == 0 or x != lst[i - 1]] for lst in sentences]
return grid_center, grouped_df
[docs]
class NgramGenerator:
def __init__(self, sentence_gdf):
"""Initialize the NgramGenerator with trajectories represented as grid cell sequences.
Args:
sentence_gdf(pd.DataFrame): A pandas DataFrame containing trajectory data where
each row represents a trip. Must have an 'ID' column containing lists of
tuples, where each tuple represents a grid cell coordinate (column, row)
that the trajectory passes through. This is typically the output from
ConvertToToken.create_tokens().
"""
self.sentences = sentence_gdf['ID'].values.tolist()
[docs]
def find_start_end_points(self):
"""Extract start and end bigrams from trajectory sequences.
Identifies the starting and ending positions of every trajectory by extracting the first
and last two grid cells. Duplicate consecutive cells are first removed to ensure meaningful
start/end points.
Returns:
start_end_points(list): a list of lists. Each inner contains two tuples: the first one
represents the start bigram of a trip and the second one the end bigram of a trip.
Only trips with more than three unique consecutive cells are included in the result.
"""
sentences = [[x for i, x in enumerate(lst) if i == 0 or x != lst[i - 1]] for lst in self.sentences]
start_end_points = []
for sentence in sentences:
if len(sentence) > 3:
start_end_points.append([tuple((sentence[0], sentence[1])), tuple((sentence[-2], sentence[-1]))])
return start_end_points
[docs]
def reverse_sentences(self, sentences):
"""Reverse trajectory sequences.
Args:
sentences(list): a list of lists, with the inner list consisting of a sequence of cell IDs.
Returns:
reversed_sentences(list): a list of lists, with the inner list not containing a reversed version
of original sequences.
"""
reversed_sentences = []
for sent in sentences:
reverse = sent[::-1]
reversed_sentences.append(reverse)
return reversed_sentences
[docs]
def create_ngrams(self):
"""Extract bigrams and trigrams from the original and reversed trajectory sequences.
Sentences, converted to list from the "ID" column of input dataframe, are reversed before bigrams and trigrams
are extracted from both the original and reversed sentences. Each bigram dictionary also keeps count of unqiue
bigram and trigrams.
Returns:
ngrams(dict): a dictionary of four dictionaries. Each inner dictionary is comprised of items that
has a tuple of cell IDs as its key and its number of occurance as the value.
start_end_points(list): a list of lists, as returned by find_start_end_points().
"""
start_end_points = self.find_start_end_points()
sentences_reversed = self.reverse_sentences(self.sentences)
# corpus = self.sentences + sentences_reversed
bigrams_reversed = {}
trigrams_reversed = {}
for sentence in tqdm(sentences_reversed):
# for word in sentence:
# unigram_counts[word] = unigram_counts.get(word, 0) + 1
# self.total_unigrams += 1
for i in range(len(sentence) - 1):
bigram = (tuple(sentence[i:i+2]))
bigrams_reversed[bigram] = bigrams_reversed.get(bigram, 0) + 1
for i in range(len(sentence) - 2):
trigram = (tuple(sentence[i:i+3]))
trigrams_reversed[trigram] = trigrams_reversed.get(trigram, 0) + 1
bigrams_original = {}
trigrams_original = {}
for sentence in tqdm(self.sentences):
for i in range(len(sentence) - 1):
bigram = (tuple(sentence[i:i+2]))
bigrams_original[bigram] = bigrams_original.get(bigram, 0) + 1
for i in range(len(sentence) - 2):
trigram = (tuple(sentence[i:i+3]))
trigrams_original[trigram] = trigrams_original.get(trigram, 0) + 1
print(f"\nNumber of Unique Bigrams: {len(bigrams_original)} \nNumber of Unique Trigrams: {len(trigrams_original)}")
ngrams = {
'bigrams_original': bigrams_original,
'bigrams_reversed': bigrams_reversed,
'trigrams_original': trigrams_original,
'trigrams_reversed': trigrams_reversed
}
return ngrams, start_end_points
[docs]
def process_trigrams(trigrams):
"""Arrange trigram tuples and their count of occurance in a different format.
Create a dictionary that has the first two tokens in a trigram tuple as its key and the last token,
as well as the occurance count of the trigram as its value. This arrangement facilitates next-point
prediction through a statistical approach.
Args:
trigrams(dict): a dictionary of trigram tuples and their occurance count in the format of
{(token_1, token_2, token_3): count}.
Returns:
trigrams_dict(dict): an rearranged trigram dictionary, formatted as {(token_1, token_2): [(token_3, conut), ...]}
"""
trigrams_dict = defaultdict(list)
for trigram, count in trigrams.items():
first_two_tokens = trigram[:2]
third_token = trigram[2]
trigrams_dict[first_two_tokens].append((third_token, count))
return trigrams_dict
[docs]
def process_trigrams_2(trigrams):
"""Reorganizes trigrams in an alternative format.
Transforms a trigram dictionary into a lookup structure where pairs of (first_token, third_token)
are mapped to a list of second_tokens. This is useful for finding "bridge" points between two
non-adjacent grid cells.
Args:
trigrams(dict): a dictionary of trigram tuples and their occurance count in the format of
{(token_1, token_2, token_3): count}.
Returns:
trigram_dict_2(dict): a dictionary mapping (first_token, third_token) tuples to a list of
middle tokens.
"""
trigram_dict_2 = defaultdict(list)
for trigram in trigrams.keys():
trigram_dict_2[(trigram[0]), trigram[-1]].append(trigram[1])
return trigram_dict_2
[docs]
class TrajGenerator:
def __init__(self, ngrams, start_end_points, n, grid):
"""Initialize a generator with ngrams and grid information.
Args:
ngrams(dict): dictionary mapping ngrams to their frequency:
-'trigrams_original': dict mapping trigram tuples to their counts;
-'trigrams_reversed': dict mapping reversed trigram tuples to their counts;
-'bigrams_original': dict mapping bigram tuples to their counts;
-'bigrams_reversed': dict mapping reversed bigram tuples to their counts;
start_end_points(list): list of tuples where each tuple contains:
-first element: a tuple of (first_point, second_point);
-second element: a tupe of (second_to_last_point, last_point);
n(int): number of trajectories to generate;
grid(gpd.GeoDataFrame): GeoDataFrame containing grid cell information with columns:
-'geometry': Shapely Point objects representing cell centroids;
-'ID': tuple identifiers (row, col) for each grid cell;
"""
# Count the number of occurance of each unique trigrams in both original and reversed versions
self.trigrams = {key: ngrams['trigrams_original'].get(key, 0) + ngrams['trigrams_reversed'].get(key, 0) for key in set(ngrams['trigrams_original']) | set(ngrams['trigrams_reversed'])}
self.trigram_dict = process_trigrams(self.trigrams)
self.trigram_dict_original= process_trigrams(ngrams['trigrams_original'])
self.trigrams_dict_2 = process_trigrams_2(self.trigrams)
self.start_end_points = start_end_points
self.grid_center = grid
self.num_sentences = n
self.k = 3
[docs]
@staticmethod
def start_path(start, end):
"""Create an initial 4-point trajectory by inserting closest points in the middle.
This method finds the two points (one from start and one from end) that are closest to each other in
Euclidean space, then arranges all four points to form a smooth initial segment.
Agrs:
start(tuple): a tuple of two points representing the start of a trip:
- First point: starting point as (x, y) coordinates
- Second point: second point as (x, y) coordinates
end(tuple): a tuple of two points representing the end of a trip:
- First point: second-to-last point as (x, y) coordinates
- Second point: last point as (x, y) coordinates
Returns:
path_start(list): an intial trip segment of (outer_point1, close_point1, close_point2, outer_point2).
"""
min_distance = float('inf')
closest_pair = None
# Calculate the Euclidean distance between each pair of points (one from each list)
for point1 in start:
for point2 in end:
dist = scipy.spatial.distance.euclidean(point1, point2)
if dist < min_distance:
min_distance = dist
closest_pair = (point1, point2)
path_start = [point for point in start + end if point not in closest_pair]
path_start.insert(1, closest_pair[0])
path_start.insert(2, closest_pair[-1])
return path_start
[docs]
@staticmethod
def calculate_distance(point1, point2):
"""Calculate the Euclidean distance between two points in a 2D plane.
Args:
point1(tuple): first point as a tuple of (x, y) coordinates;
point2(tuple): second point as a tuple of (x, y) coordinates;
Returns:
Float: always returns a non-negative value.
"""
x1, y1 = point1
x2, y2 = point2
return math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
[docs]
def find_next_tokens(self, left, right, path_sentence):
"""Find the next token pairs to extend a trajectory by analysing trigram frequency and spatial distance
This method identifies potential next tokens for both left and right sides of a growing trajectory. It uses
trigram frequency data to find the next probable points, then selects token pairs based on their spatial
proximity to maintain coherence. This method ensures no repeated tokens in the path.
Args:
left(list): a list of two tokens representing left edge of current path;
right(list): a list of two tokens representing right edge of current path;
path_sentence(list): current path as a list of tokens. Used to prevent selecting tokens that would create
loops in a trajectory;
Returns:
points(list): a list of 3 token pairs, where each element is a token of ((left_point, right_point)) that
represents an extension of current path.
"""
next_tokens_l = dict(self.trigram_dict.get(tuple(left), []))
next_tokens_r = dict(self.trigram_dict.get(tuple(right), []))
next_tokens_with_counts_l = {key: value for key, value in next_tokens_l.items() if key not in path_sentence}
next_tokens_with_counts_r = {key: value for key, value in next_tokens_r.items() if key not in path_sentence}
sorted_next_tokens_l = sorted(next_tokens_with_counts_l.items(), key=lambda x: x[1], reverse=True)
sorted_next_tokens_l_top_k = sorted_next_tokens_l[:self.k] if len(sorted_next_tokens_l) >= self.k else sorted_next_tokens_l
sorted_next_tokens_r = sorted(next_tokens_with_counts_r.items(), key=lambda x: x[1], reverse=True)
sorted_next_tokens_r_top_k = sorted_next_tokens_r[:self.k] if len(sorted_next_tokens_r) >= self.k else sorted_next_tokens_r
closest_points = {}
for point1, _ in sorted_next_tokens_l_top_k:
for point2, _ in sorted_next_tokens_r_top_k:
distance = TrajGenerator.calculate_distance(point1, point2)
closest_points[(point1, point2)] = distance
closest_points_top3 = dict(sorted(closest_points.items(), key=lambda x: x[1], reverse=False)[:self.k])
points = list(closest_points_top3.keys())
return points
[docs]
def generate_sentences_using_origin_destination(self):
"""Generate a complete trajectory by connecting origin and destination points through spatial proximity.
This method creates a trajectory by starting with randomly selected origin-destination pairs
and iteratively filling in the path between them. It uses a bidirectional growth approach,
extending from both ends simultaneously while maintaining spatial coherence through trigram
frequencies and distance minimization. The process continues until the growing ends meet
close enough that they can be connected by a single intermediate token.
Returns:
path_sentence(list): A complete trajectory as a list of tokens (coordinate tuples) representing
a path from origin to destination. Returns empty list if unable to generate
a valid path after 3 attempts.
"""
full_sentence = False
random_path = random.choice(self.start_end_points)
start = random_path[0]
end = random_path[1]
num_tries = 0
while not full_sentence:
path_start = self.start_path(start, end)
left = path_start[:2]
right = path_start[-2:]
path_sentence = path_start
for i in range(40):
points = self.find_next_tokens(left, right, path_sentence)
try:
j = random.randint(0, len(points)-1)
except:
continue
left = [left[-1], points[j][0]]
right = [right[-1], points[j][1]]
path_sentence.insert(i+2, left[-1])
path_sentence.insert(i+3, right[-1])
# Check if a trigram that matches the left and righ tokens exists in the trigram corpus.
# If one exists, the points are close enough and a full 'sentence' is constructed
if len (self.trigrams_dict_2[left[-1], right[-1]]) > 1:
fills = self.trigrams_dict_2[left[-1], right[-1]]
trigram_fills = {}
for each in fills:
trigram = tuple((left[-1], each, right[-1]))
trigram_fills[trigram] = self.trigrams[trigram]
trigram_with_highest_count = max(trigram_fills, key=lambda k: trigram_fills[k])
path_sentence.insert(i+3, trigram_with_highest_count[1])
full_sentence = True
break
if full_sentence:
return path_sentence
num_tries += 1
if num_tries == 3:
return []
[docs]
def generate_sentences_using_origin(self, length, seed=None):
"""Generate a trajectory of specified length starting from a random origin point using trigram language model.
Creates a trajectory by starting with an origin point pair and extending it token by token
using weighted random selection based on trigram frequencies. This method follows a
traditional n-gram language model approach where the next token is probabilistically
chosen based on the frequency distribution of observed trigrams in the training data.
Args:
length (int): Target length of the trajectory in number of tokens/points.
The actual length may be shorter if no valid continuations exist.
seed (int, optional): Random seed for reproducible trajectory generation.
If provided, ensures deterministic origin selection from available
start points. Defaults to None for random selection.
Returns:
text(list): A trajectory as a list of tokens (coordinate tuples), starting from
the selected origin. Length will be min(length, available_path_length).
May be shorter than requested if the trajectory reaches a dead end.
"""
text = []
if seed is not None:
random.seed(seed)
current_trigram = random.sample(self.start_end_points, min(len(self.start_end_points), self.num_sentences))[0][0]
else:
current_trigram = random.choice(self.start_end_points)[0]
text.extend(current_trigram)
while len(text) < length:
# Get the list of next tokens and their counts for the current trigram
next_tokens_with_counts = self.trigram_dict_original.get(current_trigram, [])
if not next_tokens_with_counts:
break
# Choose the next token based on its counts
total_count = sum(count for _, count in next_tokens_with_counts)
random_value = random.randint(1, total_count)
cumulative_count = 0
next_token = None
#pick the next token randomly from the possible next tokens
for token, count in next_tokens_with_counts:
cumulative_count += count
if random_value <= cumulative_count:
next_token = token
break
# Append the next token to the text
text.append(next_token)
# Update the current trigram
current_trigram = current_trigram[1:] + (next_token,)
return text
[docs]
def convert_sentence_to_traj(self, generated_sentences):
"""Convert tokenized trajectory sentences into geographic coordinate sequences.
Transforms grid-based token representations (ID tuples) into actual geographic
trajectories by mapping each token to its corresponding grid cell centroid. This
creates smooth paths through the geographic space using the pre-computed cell
center points stored in the grid_center GeoDataFrame.
Args:
generated_sentences (list): A list of trajectory sentences, where each sentence
is a list of tokens. Each token is a tuple (column, row) representing a
grid cell ID, e.g., [[(0,1), (1,1), (2,1)], [(3,3), (3,4), (4,4)]].
Returns:
all_points(list): A list of trajectories, where each trajectory is a list of Shapely Point
objects representing the geographic coordinates. Each Point corresponds to
the centroid of the grid cell identified by the token. Invalid tokens are
silently skipped.
"""
token_to_geometry = dict(zip(self.grid_center['ID'], self.grid_center['geometry']))
all_points = []
for sentence in tqdm(generated_sentences):
sentence_geometries = [token_to_geometry[token] for token in sentence if token in token_to_geometry]
all_points.append(sentence_geometries)
return all_points
[docs]
def generate_trajs_using_origin_destination(self):
"""Generate synthetic trajectories using origin-destination pairs and return in multiple formats.
Creates a specified number of synthetic trajectories by repeatedly calling the origin-destination
generation algorithm. Each trajectory connects randomly selected start and end points through
spatially coherent paths. The method ensures all generated trajectories are valid (non-empty)
and converts them from token sequences to geographic coordinates. Results are returned in
two formats for different use cases.
Returns:
tuple: A pair of DataFrames containing the same trajectories in different formats:
- df (DataFrame): Trajectories as coordinate lists with columns:
- 'trip_id': Unique identifier (1 to n)
- 'geometry': List of [x, y] coordinate pairs
- gdf (GeoDataFrame): Trajectories as Shapely geometries with columns:
- 'trip_id': Unique identifier (1 to n)
- 'geometry': List of Shapely Point objects
"""
new_generated_sentences = []
with tqdm(total=self.num_sentences, desc="Generating sentences") as pbar:
while len(new_generated_sentences) < self.num_sentences:
path_sentence = self.generate_sentences_using_origin_destination()
if path_sentence:
new_generated_sentences.append(path_sentence)
pbar.update(1)
new_trajs = self.convert_sentence_to_traj(new_generated_sentences)
geom_list = []
for traj in new_trajs:
coordinates = []
for point in traj:
coordinates.append([point.x, point.y])
geom_list.append(coordinates)
df = pd.DataFrame({'geometry':geom_list})
df['trip_id'] = range(1, len(df) + 1)
df = df[['trip_id', 'geometry']]
gdf = pd.DataFrame({'geometry':new_trajs})
gdf['trip_id'] = range(1, len(gdf) + 1)
gdf = gdf[['trip_id', 'geometry']]
return df, gdf
[docs]
def generate_trajs_using_origin(self, sentence_length, seed=None):
"""Generate synthetic trajectories of specified length from origin points and return in multiple formats.
Creates a specified number of trajectories by repeatedly generating paths from randomly
selected origin points using the trigram language model approach. Each trajectory extends
from its origin for approximately the target length. The method filters out trajectories
that are significantly shorter than requested (more than 5 tokens short) to ensure quality.
Results are returned in two formats for different use cases.
Args:
sentence_length (int): Target length for each trajectory in number of tokens/points.
Trajectories shorter than (sentence_length - 5) are rejected and regenerated.
seed (int, optional): Random seed for reproducible batch generation.
If provided, generates deterministic set of trajectories. Defaults to None
for random generation.
Returns:
tuple: A pair of DataFrames containing the same trajectories in different formats:
- df (DataFrame): Trajectories as coordinate lists with columns:
- 'trip_id': Unique identifier (1 to n)
- 'geometry': List of [x, y] coordinate pairs
- gdf (DataFrame): Trajectories as Shapely geometries with columns:
- 'trip_id': Unique identifier (1 to n)
- 'geometry': List of Shapely Point objects
"""
new_generated_sentences = []
if seed is not None:
random.seed(seed)
random_seeds = [random.randint(1, len(self.start_end_points)) for _ in range(self.num_sentences)]
with tqdm(total=self.num_sentences, desc="Generating sentences") as pbar:
# while len(new_generated_sentences) < self.num_sentences:
for seed in random_seeds:
generated_text = self.generate_sentences_using_origin(sentence_length, seed)
if len(generated_text) > (sentence_length-5):
new_generated_sentences.append(generated_text)
pbar.update(1) # Update the progress bar
else:
with tqdm(total=self.num_sentences, desc="Generating sentences") as pbar:
while len(new_generated_sentences) < self.num_sentences:
generated_text = self.generate_sentences_using_origin(sentence_length, seed)
if len(generated_text) > (sentence_length-5):
new_generated_sentences.append(generated_text)
pbar.update(1) # Update the progress bar
new_trajs = self.convert_sentence_to_traj(new_generated_sentences)
geom_list = []
for traj in new_trajs:
coordinates = []
for point in traj:
coordinates.append([point.x, point.y])
geom_list.append(coordinates)
df = pd.DataFrame({'geometry':geom_list})
df['trip_id'] = range(1, len(df) + 1)
df = df[['trip_id', 'geometry']]
gdf = pd.DataFrame({'geometry':new_trajs})
gdf['trip_id'] = range(1, len(gdf) + 1)
gdf = gdf[['trip_id', 'geometry']]
return df, gdf
[docs]
class DisplayTrajs():
def __init__(self, original_trajs, generated_trajs):
"""Initialize the DisplayTrajs visualization class with original and synthetic trajectories.
Creates a visualization handler for comparing original (real) trajectories with
synthetically generated trajectories. This class provides methods to display
trajectories side-by-side on interactive maps and create heatmap visualizations
for spatial distribution analysis.
Args:
original_trajs(list): Original/real trajectories to visualize. Expected format is a list of
trajectories where each trajectory is a list of Shapely Point objects,
e.g., [[Point(x1,y1), Point(x2,y2), ...], ...].
generated_trajs (list): Synthetically generated trajectories to compare.
Expected format matches original_trajs - list of trajectories where each
trajectory is a list of Shapely Point objects.
"""
self.original_trajs = original_trajs
self.generated_trajs = generated_trajs
[docs]
def plot_map(self, trajs):
"""Creates an interactive Folium map displaying trajectory paths as polylines.
Generates an interactive web map centered on the first trajectory point and
renders all trajectories as blue polylines. The map allows users to zoom,
pan, and explore the trajectory patterns interactively.
Args:
trajs (list): A list of trajectories where each trajectory is a list of
shapely Point objects or similar geometry objects with x (longitude)
and y (latitude) attributes.
Returns:
folium.Map: A Folium map object containing all trajectories visualized
as blue polylines. The map can be displayed in Jupyter notebooks or
saved as HTML.
"""
center_coords = (trajs[0][0].y, trajs[0][0].x)
mymap = folium.Map(location=center_coords, zoom_start=12)
for points in trajs:
line = LineString(points)
line_coords = [(point[1], point[0]) for point in line.coords]
folium.PolyLine(locations=line_coords, color='blue').add_to(mymap)
return mymap
[docs]
def display_maps(self):
"""Display original and generated trajectories side-by-side in interactive Folium maps.
Creates two interactive maps showing original trajectories (left) and generated
trajectories (right) for visual comparison. Each trajectory is rendered as a blue
polyline on its respective map. The maps are displayed in a responsive HTML layout
within Jupyter notebooks or similar environments that support HTML rendering.
"""
map1 = self.plot_map(self.original_trajs)
map2 = self.plot_map(self.generated_trajs)
html_map1 = map1._repr_html_()
html_map2 = map2._repr_html_()
html = f"""
<div style="display: flex; justify-content: space-around;">
<div style="width: 45%;">
<h3 style="text-align: center;">Original Trajectories</h3>
{html_map1}
</div>
<div style="width: 45%;">
<h3 style="text-align: center;">Generated Trajectories</h3>
{html_map2}
</div>
</div>
"""
# Display the HTML
display(HTML(html))
[docs]
def merge_grid_with_points(self, grid, df, num_cells):
"""Merges trajectory points with grid cells to determine which region each point belongs to.
Performs a spatial join between trajectory points and grid cells, assigning each point
to its corresponding grid region. The method explodes the trajectory DataFrame to
individual points, converts them to a GeoDataFrame, and then performs a spatial join
with the grid to identify which grid cell contains each point.
Args:
grid (gpd.GeoDataFrame): A GeoDataFrame containing the grid cells with their
geometries. Each cell represents a spatial region.
df (pd.DataFrame): A DataFrame containing trajectory data with a 'geometry'
column that contains lists of coordinate points for each trajectory.
num_cells (int): The total number of cells in the grid. Used to assign
sequential region IDs from 0 to num_cells-1.
Returns:
gpd.GeoDataFrame: A merged GeoDataFrame where each row represents a single
trajectory point with the following additional columns:
- 'Region': The ID of the grid cell containing the point
- 'point_region': The geometry (polygon) of the grid cell containing the point, or 'nan' if the point doesn't fall within any grid cell.
"""
grid['Region'] = [i for i in range(0, num_cells)]
df = df.explode('geometry')
gdf = gpd.GeoDataFrame(df, geometry='geometry', crs = "EPSG:4326")
merged_df = gpd.sjoin(gdf, grid, how='left', predicate='within', lsuffix='_points', rsuffix='_grid')
region_geometries = {i: grid.loc[i]['geometry'] for i in range(num_cells)}
polygon_region = []
for idx, row in tqdm(merged_df.iterrows(), total=len(merged_df)):
region = row['Region']
if region in region_geometries:
polygon_region.append(region_geometries[region])
else:
polygon_region.append('nan')
merged_df['point_region'] = polygon_region
return merged_df
[docs]
def plot_heat_map(self, df, area, ax, cell_size):
"""Creates a heatmap visualization showing the density of trajectory points across grid cells.
Generates a grid over the specified area, counts the number of trajectory points
falling within each grid cell, and visualizes this density as a heatmap using a
color gradient. The heatmap helps identify areas of high and low trajectory activity.
Args:
df (pd.DataFrame): A DataFrame containing trajectory data with a 'geometry'
column that contains lists of coordinate points for each trajectory.
area (gpd.GeoDataFrame): A GeoDataFrame defining the geographical area to be
analyzed. Used to determine grid boundaries and overlay the area outline
on the plot.
ax (matplotlib.axes.Axes): The matplotlib axes object on which to draw the
heatmap. Allows integration with existing figure layouts.
cell_size (int): The side length of each grid cell in meters. Determines the
spatial resolution of the heatmap - smaller values create finer grids
with more detail.
"""
TokenCreator = ConvertToToken(df, area, cell_size)
grid, n_rows, num_cells = TokenCreator.create_grid()
df = self.merge_grid_with_points(grid, df, num_cells)
df_valid = df[df['point_region'] != 'nan']
polygon_counts = df_valid['point_region'].value_counts()
polygon_counts_df = pd.DataFrame({'geometry': polygon_counts.index, 'count': polygon_counts.values})
polygon_counts_gdf = gpd.GeoDataFrame(polygon_counts_df)
polygon_counts_gdf = polygon_counts_gdf.set_geometry('geometry')
# Plotting the heatmap
# fig, ax = plt.subplots(figsize=(10, 6))
polygon_counts_gdf.plot(column='count', cmap='YlOrRd', linewidth=0.8, ax=ax, edgecolor='0.8', legend=True)
area.plot(ax=ax, color = 'none')
ax.set_xlabel('Longitude')
ax.set_ylabel('Latitude')
ax.set_title('Generated Trajectories')