"""Tests for transformers module.""" import pytest from fhi_statistikk_mcp.transformers import ( complete_query_dimensions, extract_metadata_fields, flatten_categories, is_year_dimension, matches_search, navigate_hierarchy, normalize_for_search, normalize_year_value, parse_csv_to_rows, strip_html, summarize_dimensions, ) # --- strip_html --- def test_strip_html_removes_tags(): assert strip_html("
Hello world
") == "Hello world" def test_strip_html_preserves_plain_text(): assert strip_html("No tags here") == "No tags here" def test_strip_html_handles_empty(): assert strip_html("") == "" assert strip_html(None) is None def test_strip_html_handles_links(): assert strip_html('link') == "link" def test_strip_html_decodes_entities(): assert strip_html("& <b> ") == "& " # --- normalize_for_search / matches_search --- def test_normalize_strips_accents(): assert normalize_for_search("Tromsø") == "tromso" assert normalize_for_search("Bærum") == "barum" assert normalize_for_search("Ålesund") == "alesund" def test_normalize_lowercases(): assert normalize_for_search("OSLO") == "oslo" def test_matches_search_single_word(): assert matches_search("Befolkningsvekst", "befolkning") assert not matches_search("Befolkningsvekst", "helse") def test_matches_search_multiple_words(): assert matches_search("Befolkningsvekst Oslo", "befolkning oslo") assert not matches_search("Befolkningsvekst", "befolkning oslo") def test_matches_search_accent_insensitive(): assert matches_search("Tromsø kommune", "tromso") assert matches_search("Bærum", "barum") # --- normalize_year_value --- def test_normalize_year_short(): assert normalize_year_value("2020") == "2020_2020" def test_normalize_year_already_full(): assert normalize_year_value("2020_2020") == "2020_2020" def test_normalize_year_non_numeric(): assert normalize_year_value("all") == "all" # --- flatten_categories --- NESTED_TREE = [ { "value": "0", "label": "Hele landet", "children": [ { "value": "03", "label": "Oslo (fylke)", "children": [ {"value": "0301", "label": "Oslo", "children": []}, ], }, { "value": "18", "label": "Nordland", "children": [ {"value": "1804", "label": "Bodø", "children": []}, {"value": "1806", "label": "Narvik", "children": []}, ], }, ], }, ] def test_flatten_categories_count(): flat = flatten_categories(NESTED_TREE) assert len(flat) == 6 def test_flatten_categories_parent_values(): flat = flatten_categories(NESTED_TREE) by_value = {c["value"]: c for c in flat} assert by_value["0"]["parent_value"] is None assert by_value["03"]["parent_value"] == "0" assert by_value["0301"]["parent_value"] == "03" assert by_value["1804"]["parent_value"] == "18" def test_flatten_categories_empty(): assert flatten_categories([]) == [] # --- navigate_hierarchy --- def test_navigate_top_level(): result = navigate_hierarchy(NESTED_TREE) assert len(result) == 1 assert result[0]["value"] == "0" assert result[0]["child_count"] == 2 def test_navigate_children(): result = navigate_hierarchy(NESTED_TREE, parent_value="18") assert len(result) == 2 values = {r["value"] for r in result} assert values == {"1804", "1806"} def test_navigate_search(): result = navigate_hierarchy(NESTED_TREE, search="bodø") assert len(result) == 1 assert result[0]["value"] == "1804" def test_navigate_search_accent_insensitive(): result = navigate_hierarchy(NESTED_TREE, search="bodo") assert len(result) == 1 assert result[0]["label"] == "Bodø" # --- summarize_dimensions --- def test_summarize_fixed_dimension(): dims = [{"code": "KJONN", "label": "Kjønn", "categories": [ {"value": "0", "label": "kjønn samlet", "children": []} ]}] result = summarize_dimensions(dims) assert len(result) == 1 assert result[0]["is_fixed"] is True assert result[0]["total_categories"] == 1 def test_summarize_year_dimension(): cats = [{"value": f"{y}_{y}", "label": str(y), "children": []} for y in range(2020, 2025)] dims = [{"code": "AAR", "label": "År", "categories": cats}] result = summarize_dimensions(dims) assert result[0]["value_format"] == "YYYY_YYYY (e.g. 2020_2020)" assert result[0]["range"] == "2020..2024" def test_summarize_hierarchical_large(): children = [{"value": str(i), "label": f"Municipality {i}", "children": []} for i in range(1, 30)] cats = [{"value": "0", "label": "Hele landet", "children": children}] dims = [{"code": "GEO", "label": "Geografi", "categories": cats}] result = summarize_dimensions(dims) assert result[0]["is_hierarchical"] is True assert "top_level_values" in result[0] assert result[0]["top_level_values"][0]["child_count"] == 29 def test_summarize_small_dimension(): cats = [ {"value": "TELLER", "label": "antall", "children": []}, {"value": "RATE", "label": "prosent", "children": []}, ] dims = [{"code": "MEASURE_TYPE", "label": "Måltall", "categories": cats}] result = summarize_dimensions(dims) assert len(result[0]["values"]) == 2 assert result[0]["values"][0] == {"value": "TELLER", "label": "antall"} # --- extract_metadata_fields --- def test_extract_metadata_dict(): meta = { "name": "Test", "isOfficialStatistics": True, "paragraphs": [ {"header": "Beskrivelse", "content": "Some description
"}, {"header": "Oppdateringsfrekvens", "content": "Årlig"}, {"header": "Nøkkelord", "content": "Helse,Data"}, {"header": "Kildeinstitusjon", "content": "FHI"}, ], } fields = extract_metadata_fields(meta) assert fields["is_official_statistics"] is True assert fields["description"] == "Some description" assert fields["update_frequency"] == "Årlig" assert fields["keywords"] == ["Helse", "Data"] assert fields["source_institution"] == "FHI" def test_extract_metadata_strips_html(): meta = { "paragraphs": [ {"header": "Beskrivelse", "content": "Text with link
"}, ], } fields = extract_metadata_fields(meta) assert fields["description"] == "Text with link" # --- parse_csv_to_rows --- def test_parse_csv_basic(): csv_text = '"Col A";"Col B"\n"Oslo";"123"\n"Bergen";"456"\n' result = parse_csv_to_rows(csv_text) assert result["total_rows"] == 2 assert result["truncated"] is False assert result["rows"][0]["Col A"] == "Oslo" assert result["rows"][0]["Col B"] == 123 def test_parse_csv_truncation(): csv_text = '"X"\n"a"\n"b"\n"c"\n' result = parse_csv_to_rows(csv_text, max_rows=2) assert result["total_rows"] == 3 assert result["truncated"] is True assert len(result["rows"]) == 2 def test_parse_csv_numeric_conversion(): csv_text = '"int";"float";"missing";"text"\n"42";"3.14";"..";"hello"\n' result = parse_csv_to_rows(csv_text) row = result["rows"][0] assert row["int"] == 42 assert row["float"] == 3.14 assert row["missing"] is None assert row["text"] == "hello" def test_parse_csv_comma_decimal(): csv_text = '"val"\n"1,5"\n' result = parse_csv_to_rows(csv_text) assert result["rows"][0]["val"] == 1.5 # --- is_year_dimension --- def test_is_year_by_code(): assert is_year_dimension("AAR", []) is True assert is_year_dimension("YEAR", []) is True assert is_year_dimension("GEO", []) is False def test_is_year_by_value_format(): flat = [{"value": "2020_2020", "label": "2020", "parent_value": None}] assert is_year_dimension("CUSTOM", flat) is True # --- complete_query_dimensions --- SAMPLE_DIMS = [ {"code": "GEO", "label": "Geografi", "categories": NESTED_TREE}, {"code": "AAR", "label": "År", "categories": [ {"value": "2023_2023", "label": "2023", "children": []}, {"value": "2024_2024", "label": "2024", "children": []}, ]}, {"code": "KJONN", "label": "Kjønn", "categories": [ {"value": "0", "label": "kjønn samlet", "children": []}, ]}, {"code": "ALDER", "label": "Alder", "categories": [ {"value": "0_120", "label": "alle aldre", "children": []}, ]}, {"code": "MEASURE_TYPE", "label": "Måltall", "categories": [ {"value": "TELLER", "label": "antall", "children": []}, {"value": "RATE", "label": "prosent", "children": []}, ]}, ] def test_complete_dims_fixed_auto_included(): user_dims = [ {"code": "GEO", "filter": "item", "values": ["0301"]}, {"code": "AAR", "filter": "bottom", "values": ["1"]}, ] result = complete_query_dimensions(SAMPLE_DIMS, user_dims) codes = {d["code"] for d in result} assert "KJONN" in codes assert "ALDER" in codes kjonn = next(d for d in result if d["code"] == "KJONN") assert kjonn["values"] == ["0"] def test_complete_dims_measure_type_defaults_to_all(): user_dims = [ {"code": "GEO", "filter": "item", "values": ["0"]}, {"code": "AAR", "filter": "item", "values": ["2024"]}, ] result = complete_query_dimensions(SAMPLE_DIMS, user_dims) mt = next(d for d in result if d["code"] == "MEASURE_TYPE") assert mt["filter"] == "all" assert mt["values"] == ["*"] def test_complete_dims_year_normalization(): user_dims = [ {"code": "GEO", "filter": "item", "values": ["0"]}, {"code": "AAR", "filter": "item", "values": ["2024"]}, ] result = complete_query_dimensions(SAMPLE_DIMS, user_dims) aar = next(d for d in result if d["code"] == "AAR") assert aar["values"] == ["2024_2024"] def test_complete_dims_missing_required_raises(): user_dims = [ {"code": "AAR", "filter": "item", "values": ["2024"]}, ] with pytest.raises(ValueError, match="Missing required dimensions.*GEO"): complete_query_dimensions(SAMPLE_DIMS, user_dims) def test_complete_dims_missing_code_key_raises(): user_dims = [{"filter": "item", "values": ["0"]}] with pytest.raises(ValueError, match="missing 'code' key"): complete_query_dimensions(SAMPLE_DIMS, user_dims) def test_complete_dims_case_insensitive(): user_dims = [ {"code": "geo", "filter": "item", "values": ["0"]}, {"code": "aar", "filter": "item", "values": ["2024"]}, ] result = complete_query_dimensions(SAMPLE_DIMS, user_dims) codes = [d["code"] for d in result] assert "GEO" in codes assert "AAR" in codes def test_complete_dims_no_year_normalization_for_top_filter(): user_dims = [ {"code": "GEO", "filter": "item", "values": ["0"]}, {"code": "AAR", "filter": "top", "values": ["3"]}, ] result = complete_query_dimensions(SAMPLE_DIMS, user_dims) aar = next(d for d in result if d["code"] == "AAR") assert aar["values"] == ["3"] # not "3_3"