@ -571,6 +571,149 @@ def _compute_stability_fallback(
}
}
def compute_overtone_shift (
con : duckdb . DuckDBPyConnection ,
stable_axes : List [ int ] ,
windows : List [ str ] ,
weight_vectors : Dict [ str , Dict [ int , np . ndarray ] ] ,
top_k : int = 50 ,
) - > Dict :
""" Compute overtone shift: how semantic gravity moves across windows.
Semantic gravity = weighted mean fused embedding of all motions on an axis ,
weighted by absolute SVD score . Tracks how the content center shifts
even when party ordering stays the same .
Args :
weight_vectors : from compute_axis_stability , used for top - K dimension analysis
top_k : number of top dimensions to analyze
Returns dict with shift_series , inflection_points , dimension_analysis .
"""
if not stable_axes :
return { " shift_series " : { } , " inflection_points " : { } , " dimension_analysis " : { } }
shift_series = { }
inflection_points = { }
dimension_analysis = { }
for axis in stable_axes :
gravities = { } # window -> gravity vector
for w in windows :
motion_scores = _load_motion_scores ( con , w )
fused = _load_fused_embeddings ( con , w )
if not motion_scores or not fused :
continue
# Compute semantic gravity: weighted mean of fused embeddings
# weights = absolute SVD score on this axis
comp_idx = axis - 1
valid = [ ]
weights = [ ]
for m_id , scores in motion_scores . items ( ) :
if m_id in fused and comp_idx < len ( scores ) :
valid . append ( fused [ m_id ] )
weights . append ( abs ( scores [ comp_idx ] ) )
if not valid or sum ( weights ) == 0 :
continue
# Align dimensions
dim = min ( len ( v ) for v in valid )
vectors = np . array ( [ v [ : dim ] for v in valid ] )
w_arr = np . array ( weights [ : len ( vectors ) ] )
gravity = np . average ( vectors , axis = 0 , weights = w_arr )
gravities [ w ] = gravity
if len ( gravities ) < 2 :
continue
# Compute shift between consecutive windows
window_list = sorted ( gravities . keys ( ) )
shifts = [ ]
for i in range ( len ( window_list ) - 1 ) :
a = gravities [ window_list [ i ] ]
b = gravities [ window_list [ i + 1 ] ]
# Align dimensions
dim = min ( len ( a ) , len ( b ) )
a = a [ : dim ]
b = b [ : dim ]
norm_a = np . linalg . norm ( a )
norm_b = np . linalg . norm ( b )
if norm_a == 0 or norm_b == 0 :
shifts . append ( 0.0 )
else :
cosine_sim = np . dot ( a , b ) / ( norm_a * norm_b )
shifts . append ( 1.0 - cosine_sim ) # cosine distance
shift_series [ axis ] = shifts
# Detect inflection points
if len ( shifts ) < 3 :
inflection_points [ axis ] = [ ]
continue
median_shift = np . median ( shifts )
threshold = 2 * median_shift if median_shift > 0 else 0.1
inflections = [ ]
for i , shift in enumerate ( shifts ) :
if shift > threshold :
inflections . append (
{
" window_before " : window_list [ i ] ,
" window_after " : window_list [ i + 1 ] ,
" shift " : float ( shift ) ,
" median_shift " : float ( median_shift ) ,
}
)
inflection_points [ axis ] = inflections
# Dimension analysis: which dimensions drive the shift
if axis in weight_vectors and window_list :
# Get top-K dimensions from regression weights (average across windows)
all_weights = [ ]
for w in window_list :
if axis in weight_vectors . get ( w , { } ) :
all_weights . append ( weight_vectors [ w ] [ axis ] )
if all_weights :
avg_weights = np . mean (
[
np . abs ( w [ : min ( len ( x ) for x in all_weights ) ] )
for w in all_weights
] ,
axis = 0 ,
)
top_dims = np . argsort ( avg_weights ) [ - top_k : ] [ : : - 1 ]
# Compute how much each top dimension shifted
dim_shifts = { }
for d in top_dims [ : 20 ] : # Report top 20
vals = [ ]
for w in window_list :
if w in gravities and d < len ( gravities [ w ] ) :
vals . append ( gravities [ w ] [ d ] )
if len ( vals ) > = 2 :
dim_shifts [ int ( d ) ] = {
" mean " : float ( np . mean ( vals ) ) ,
" std " : float ( np . std ( vals ) ) ,
" range " : float ( max ( vals ) - min ( vals ) ) ,
}
dimension_analysis [ axis ] = {
" top_dimensions " : [ int ( d ) for d in top_dims [ : 20 ] ] ,
" dimension_shifts " : dim_shifts ,
}
return {
" shift_series " : shift_series ,
" inflection_points " : inflection_points ,
" dimension_analysis " : dimension_analysis ,
}
def compute_semantic_drift (
def compute_semantic_drift (
con : duckdb . DuckDBPyConnection ,
con : duckdb . DuckDBPyConnection ,
stable_axes : List [ int ] ,
stable_axes : List [ int ] ,
@ -837,6 +980,7 @@ def _generate_report(
party_result : Dict ,
party_result : Dict ,
windows : List [ str ] ,
windows : List [ str ] ,
top_n : int ,
top_n : int ,
overtone_result : Optional [ Dict ] = None ,
) - > str :
) - > str :
""" Generate markdown report with analysis results. """
""" Generate markdown report with analysis results. """
import matplotlib
import matplotlib
@ -1030,15 +1174,80 @@ def _generate_report(
lines . append ( " No cross-ideological voting detected. " )
lines . append ( " No cross-ideological voting detected. " )
lines . append ( " " )
lines . append ( " " )
# Overtone Shift
if overtone_result and overtone_result . get ( " shift_series " ) :
lines . append ( " ## Overtone Shift " )
lines . append ( " " )
lines . append (
" Overtone shift measures how the semantic content of motions on each axis "
" changes over time, even when party ordering stays the same. "
)
lines . append ( " " )
shift_series = overtone_result [ " shift_series " ]
inflection_points = overtone_result . get ( " inflection_points " , { } )
dim_analysis = overtone_result . get ( " dimension_analysis " , { } )
for axis , shifts in shift_series . items ( ) :
avg_shift = np . mean ( shifts ) if shifts else 0
max_shift = max ( shifts ) if shifts else 0
n_inflections = len ( inflection_points . get ( axis , [ ] ) )
lines . append ( f " ### Axis { axis } " )
lines . append ( " " )
lines . append ( f " - **Average shift:** { avg_shift : .4f } " )
lines . append ( f " - **Max shift:** { max_shift : .4f } " )
lines . append ( f " - **Inflection points:** { n_inflections } " )
lines . append ( " " )
# Show inflection points
if inflection_points . get ( axis ) :
for inf in inflection_points [ axis ] :
lines . append (
f " - ** { inf [ ' window_before ' ] } → { inf [ ' window_after ' ] } **: "
f " shift= { inf [ ' shift ' ] : .4f } (median= { inf [ ' median_shift ' ] : .4f } ) "
)
lines . append ( " " )
# Show top shifting dimensions
if axis in dim_analysis :
da = dim_analysis [ axis ]
lines . append ( " **Top shifting dimensions:** " )
lines . append ( " " )
lines . append ( " | Dim | Mean | Std | Range | " )
lines . append ( " |-----|------|-----|-------| " )
for d , stats in sorted (
da . get ( " dimension_shifts " , { } ) . items ( ) ,
key = lambda x : x [ 1 ] [ " range " ] ,
reverse = True ,
) [ : 10 ] :
lines . append (
f " | { d } | { stats [ ' mean ' ] : .4f } | { stats [ ' std ' ] : .4f } | { stats [ ' range ' ] : .4f } | "
)
lines . append ( " " )
# Methodology
# Methodology
lines . append ( " ## Methodology " )
lines . append ( " ## Methodology " )
lines . append ( " " )
lines . append ( " " )
lines . append (
lines . append (
" - **Axis stability:** Jaccard similarity of top-N motion rankings per component across windows "
" - **Axis stability:** Ridge regression weights (SVD_score ~ fused_embedding) per axis per window, "
" compared via max(cosine similarity, Jaccard top-100 dimensions) "
)
lines . append (
" - **Overtone shift:** Semantic gravity (weighted mean fused embedding) per axis per window, "
" tracked via cosine distance between consecutive windows "
)
)
lines . append (
lines . append (
" - **Semantic drift:** Cosine distance between fused embedding centroids of top-N motions per axis "
" - **Semantic drift:** Cosine distance between fused embedding centroids of top-N motions per axis "
)
)
lines . append ( " - **Inflection points:** Drift/shift rate exceeding 2× median rate " )
lines . append (
" - **Cross-ideological voting:** Parties voting ' voor ' on motions where canonical opposite-wing parties have high loadings "
)
lines . append ( " " )
lines . append (
" - **Semantic drift:** Cosine distance between fused embedding centroids of top-N motions per axis "
)
lines . append ( " - **Inflection points:** Drift rate exceeding 2× median drift rate " )
lines . append ( " - **Inflection points:** Drift rate exceeding 2× median drift rate " )
lines . append (
lines . append (
" - **Cross-ideological voting:** Parties voting ' voor ' on motions where canonical opposite-wing parties have high loadings "
" - **Cross-ideological voting:** Parties voting ' voor ' on motions where canonical opposite-wing parties have high loadings "
@ -1129,6 +1338,12 @@ def main(argv: Optional[List[str]] = None) -> int:
con , stability_result [ " stable_axes " ] , windows , args . top_n
con , stability_result [ " stable_axes " ] , windows , args . top_n
)
)
logger . info ( " Computing overtone shift... " )
weight_vectors = stability_result . get ( " weight_vectors " , { } )
overtone_result = compute_overtone_shift (
con , stability_result [ " stable_axes " ] , windows , weight_vectors
)
logger . info ( " Computing party voting analysis... " )
logger . info ( " Computing party voting analysis... " )
party_result = compute_party_voting (
party_result = compute_party_voting (
con , stability_result [ " stable_axes " ] , windows
con , stability_result [ " stable_axes " ] , windows
@ -1143,6 +1358,7 @@ def main(argv: Optional[List[str]] = None) -> int:
party_result ,
party_result ,
windows ,
windows ,
args . top_n ,
args . top_n ,
overtone_result = overtone_result ,
)
)
logger . info ( " Report generated: %s " , report_path )
logger . info ( " Report generated: %s " , report_path )