Summarize this post with:
I built a Python Streamlit app that visualizes Internal Link clusters via page type regex so that you can visualize link flow amongst content groups.
Let’s see how the outcome looks like
How it works?
I will share the Python Script also, but UI wise, how it works, let’s break that down.
You have to provide page regex & assign cluster name & then generate the sankey visualization
Let’s see that view of Streamlit
I did this exercise with Screaming Frog crawl of Gymshark website
Now here is the full Python Script that you can use
#!/usr/bin/env python3
"""
Regex-Based Internal Linking Sankey Visualizer (Streamlit App)
Upload an internal linking CSV, define URL-contains rules to cluster URLs,
and visualize link flow between clusters as an interactive Sankey diagram.
Download the Sankey as a self-contained HTML file.
Usage:
streamlit run "sitemap_based-inlinks-sankey - Copy.py"
"""
import re
import csv
import json
from collections import defaultdict
from io import StringIO
import streamlit as st
# ─────────────────────────────────────────────────────────────────────
# SESSION STATE DEFAULTS
# ─────────────────────────────────────────────────────────────────────
if "rules" not in st.session_state:
st.session_state.rules = [
{"pattern": "", "cluster": ""},
]
if "sankey_html" not in st.session_state:
st.session_state.sankey_html = None
UNCLUSTERED_LABEL = "Unclustered"
# ─────────────────────────────────────────────────────────────────────
# URL CLASSIFICATION
# ─────────────────────────────────────────────────────────────────────
def compile_rules(rules: list[dict]) -> list[tuple[re.Pattern, str]]:
"""Compile user-defined rules into (regex, cluster_name) pairs."""
compiled = []
for r in rules:
pattern = r.get("pattern", "").strip()
cluster = r.get("cluster", "").strip()
if not pattern or not cluster:
continue
try:
compiled.append((re.compile(pattern, re.IGNORECASE), cluster))
except re.error:
st.warning(f"Invalid regex pattern: `{pattern}` — skipped.")
return compiled
def classify_url(url: str, compiled_rules: list[tuple[re.Pattern, str]]) -> str:
"""Return the cluster name for a URL based on the first matching rule."""
for regex, cluster in compiled_rules:
if regex.search(url):
return cluster
return UNCLUSTERED_LABEL
def normalize_url(raw: str) -> str:
if not raw:
return ""
url = raw.strip().rstrip("/")
if "#" in url:
url = url.split("#")[0]
return url
# ─────────────────────────────────────────────────────────────────────
# CSV PROCESSING
# ─────────────────────────────────────────────────────────────────────
def process_csv(
file_content: str,
source_col: str,
target_col: str,
compiled_rules: list[tuple[re.Pattern, str]],
exclude_unclustered: bool,
delimiter: str = ",",
) -> tuple[dict[tuple[str, str], int], dict]:
"""Stream-read the CSV and aggregate link counts between clusters."""
reader = csv.DictReader(StringIO(file_content), delimiter=delimiter)
flow: dict[tuple[str, str], int] = defaultdict(int)
cluster_url_counts: dict[str, int] = defaultdict(int)
total_rows = 0
skipped_empty = 0
skipped_unclustered = 0
counted = 0
for row in reader:
total_rows += 1
raw_src = (row.get(source_col, "") or "").strip()
raw_tgt = (row.get(target_col, "") or "").strip()
if not raw_src or not raw_tgt:
skipped_empty += 1
continue
src_cluster = classify_url(raw_src, compiled_rules)
tgt_cluster = classify_url(raw_tgt, compiled_rules)
cluster_url_counts[src_cluster] += 1
cluster_url_counts[tgt_cluster] += 1
if exclude_unclustered and (
src_cluster == UNCLUSTERED_LABEL or tgt_cluster == UNCLUSTERED_LABEL
):
skipped_unclustered += 1
continue
flow[(src_cluster, tgt_cluster)] += 1
counted += 1
stats = {
"total_rows": total_rows,
"skipped_empty": skipped_empty,
"skipped_unclustered": skipped_unclustered,
"counted": counted,
"cluster_url_counts": dict(cluster_url_counts),
}
return dict(flow), stats
# ─────────────────────────────────────────────────────────────────────
# SANKEY DATA PREPARATION
# ─────────────────────────────────────────────────────────────────────
def prepare_sankey_data(
flow: dict[tuple[str, str], int], min_links: int = 0
) -> dict:
node_set: set[str] = set()
for (src, tgt), count in flow.items():
if count <= min_links:
continue
node_set.add(src)
node_set.add(tgt)
sorted_nodes = sorted(node_set)
nodes = []
node_index: dict[str, int] = {}
for name in sorted_nodes:
node_index[name] = len(nodes)
nodes.append({"id": name, "label": name})
links = []
for (src, tgt), count in flow.items():
if count <= min_links:
continue
if src in node_index and tgt in node_index:
links.append(
{
"source": node_index[src],
"target": node_index[tgt],
"value": count,
"sourceLabel": src,
"targetLabel": tgt,
}
)
links.sort(key=lambda x: x["value"], reverse=True)
return {"nodes": nodes, "links": links}
# ─────────────────────────────────────────────────────────────────────
# HTML GENERATION
# ─────────────────────────────────────────────────────────────────────
def generate_html(sankey_data: dict) -> str:
data_json = json.dumps(sankey_data, indent=None)
html = f"""
Internal Linking — Cluster Sankey
Cluster Link Flow
0
"""
return html
# ─────────────────────────────────────────────────────────────────────
# STREAMLIT APP
# ─────────────────────────────────────────────────────────────────────
def main():
st.set_page_config(
page_title="Internal Link Sankey Builder",
page_icon="🔗",
layout="wide",
)
st.title("Internal Link Sankey Builder")
st.caption(
"Upload a CSV with internal links, define regex rules to cluster URLs, "
"and visualize link flow as an interactive Sankey."
)
# ── Sidebar: File upload & column config ──
with st.sidebar:
st.header("1. Upload CSV")
uploaded_file = st.file_uploader(
"Internal linking CSV", type=["csv", "tsv", "txt"]
)
delimiter = st.selectbox("Delimiter", [",", "\t", ";", "|"], index=0)
if uploaded_file is not None:
file_content = uploaded_file.getvalue().decode("utf-8-sig", errors="replace")
reader = csv.DictReader(StringIO(file_content), delimiter=delimiter)
columns = reader.fieldnames or []
if not columns:
st.error("No columns found in CSV. Check delimiter.")
return
st.header("2. Select Columns")
source_col = st.selectbox("Source URL column", columns, index=0)
target_idx = min(1, len(columns) - 1)
target_col = st.selectbox(
"Target URL column", columns, index=target_idx
)
st.header("3. Options")
exclude_unclustered = st.checkbox(
"Exclude unclustered URLs", value=False,
help="Hide links where either source or target doesn't match any rule.",
)
min_links = st.number_input(
"Min links to show a flow", min_value=0, value=0, step=1
)
# ── Main area: Rules editor ──
st.header("URL Clustering Rules")
st.markdown(
"Define regex patterns to match against URLs. "
"Each URL is assigned to the **first matching** rule's cluster. "
"Full regex supported — `|` for OR, `^`/`$` for exact match, `\\.` for literal dot."
)
col_pattern, col_cluster, col_action = st.columns([3, 2, 1])
with col_pattern:
st.markdown("**URL pattern (regex)**")
with col_cluster:
st.markdown("**Cluster name**")
with col_action:
st.markdown("**Action**")
rules_to_remove = []
for i, rule in enumerate(st.session_state.rules):
col_p, col_c, col_a = st.columns([3, 2, 1])
with col_p:
st.session_state.rules[i]["pattern"] = st.text_input(
f"pattern_{i}",
value=rule["pattern"],
label_visibility="collapsed",
placeholder="e.g. /blog/|/news/ or ^https://example\\.com/$",
key=f"pat_{i}",
)
with col_c:
st.session_state.rules[i]["cluster"] = st.text_input(
f"cluster_{i}",
value=rule["cluster"],
label_visibility="collapsed",
placeholder="e.g. Technical SEO",
key=f"cls_{i}",
)
with col_a:
if st.button("Remove", key=f"rm_{i}", use_container_width=True):
rules_to_remove.append(i)
if rules_to_remove:
for idx in sorted(rules_to_remove, reverse=True):
st.session_state.rules.pop(idx)
st.rerun()
col_add, col_clear, _ = st.columns([1, 1, 3])
with col_add:
if st.button("+ Add rule", use_container_width=True):
st.session_state.rules.append({"pattern": "", "cluster": ""})
st.rerun()
with col_clear:
if st.button("Clear all", use_container_width=True):
st.session_state.rules = [{"pattern": "", "cluster": ""}]
st.rerun()
# ── Generate ──
st.divider()
if uploaded_file is None:
st.info("Upload a CSV in the sidebar to get started.")
return
valid_rules = [
r
for r in st.session_state.rules
if r["pattern"].strip() and r["cluster"].strip()
]
if not valid_rules:
st.warning("Add at least one clustering rule with both a pattern and a cluster name.")
return
if st.button("Generate Sankey", type="primary", use_container_width=True):
compiled = compile_rules(valid_rules)
if not compiled:
st.error("No valid rules to apply.")
return
with st.spinner("Processing CSV and building Sankey..."):
flow, stats = process_csv(
file_content,
source_col,
target_col,
compiled,
exclude_unclustered,
delimiter,
)
if not flow:
st.error("No link flows found. Check your columns and rules.")
return
sankey_data = prepare_sankey_data(flow, min_links=min_links)
html = generate_html(sankey_data)
st.session_state.sankey_html = html
st.session_state.sankey_stats = stats
st.session_state.sankey_flow = flow
total_links = sum(flow.values())
clusters_found = set()
for src, tgt in flow:
clusters_found.add(src)
clusters_found.add(tgt)
st.success(
f"Done — {total_links:,} links across {len(clusters_found)} clusters, "
f"{len(flow)} unique flows."
)
# ── Display & Download ──
if st.session_state.sankey_html:
st.subheader("Sankey Diagram")
st.components.v1.html(st.session_state.sankey_html, height=700, scrolling=False)
st.download_button(
label="Download Sankey as HTML",
data=st.session_state.sankey_html,
file_name="cluster_sankey.html",
mime="text/html",
use_container_width=True,
)
if st.session_state.get("sankey_stats"):
stats = st.session_state.sankey_stats
with st.expander("Debug: Processing Summary", expanded=False):
col1, col2, col3, col4 = st.columns(4)
col1.metric("Total CSV rows", f"{stats['total_rows']:,}")
col2.metric("Counted (used)", f"{stats['counted']:,}")
col3.metric("Skipped (empty src/tgt)", f"{stats['skipped_empty']:,}")
col4.metric("Skipped (unclustered)", f"{stats['skipped_unclustered']:,}")
st.markdown("**URL appearances per cluster** (source + target side counted separately):")
cluster_counts = stats["cluster_url_counts"]
for cluster in sorted(cluster_counts, key=cluster_counts.get, reverse=True):
st.text(f" {cluster}: {cluster_counts[cluster]:,}")
st.markdown("**Flow breakdown** (source cluster -> target cluster = count):")
flow = st.session_state.get("sankey_flow", {})
for (src, tgt), count in sorted(flow.items(), key=lambda x: x[1], reverse=True):
st.text(f" {src} -> {tgt} = {count:,}")
if __name__ == "__main__":
main()
You can also download sankey chart visualisation as an HTML file.
This visualization can be extremely useful to visualize flow of internal linking between the content groups.
In a bidirectional way it even quantifies the no of links.

Kunjal Chawhan founder of Decode Digital Market, a Digital Marketer by profession, and a Digital Marketing Niche Blogger by passion, here to share my knowledge
