[e571d1]: / ext / lab2im / edit_volumes.py

Download this file

2837 lines (2423 with data), 142.7 kB

   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
"""
This file contains functions to edit/preprocess volumes (i.e. not tensors!).
These functions are sorted in five categories:
1- volume editing: this can be applied to any volume (i.e. images or label maps). It contains:
-mask_volume
-rescale_volume
-crop_volume
-crop_volume_around_region
-crop_volume_with_idx
-pad_volume
-flip_volume
-resample_volume
-resample_volume_like
-get_ras_axes
-align_volume_to_ref
-blur_volume
2- label map editing: can be applied to label maps only. It contains:
-correct_label_map
-mask_label_map
-smooth_label_map
-erode_label_map
-get_largest_connected_component
-compute_hard_volumes
-compute_distance_map
3- editing all volumes in a folder: functions are more or less the same as 1, but they now apply to all the volumes
in a given folder. Thus we provide folder paths rather than numpy arrays as inputs. It contains:
-mask_images_in_dir
-rescale_images_in_dir
-crop_images_in_dir
-crop_images_around_region_in_dir
-pad_images_in_dir
-flip_images_in_dir
-align_images_in_dir
-correct_nans_images_in_dir
-blur_images_in_dir
-create_mutlimodal_images
-convert_images_in_dir_to_nifty
-mri_convert_images_in_dir
-samseg_images_in_dir
-niftyreg_images_in_dir
-upsample_anisotropic_images
-simulate_upsampled_anisotropic_images
-check_images_in_dir
4- label maps in dir: same as 3 but for label map-specific functions. It contains:
-correct_labels_in_dir
-mask_labels_in_dir
-smooth_labels_in_dir
-erode_labels_in_dir
-upsample_labels_in_dir
-compute_hard_volumes_in_dir
-build_atlas
5- dataset editing: functions for editing datasets (i.e. images with corresponding label maps). It contains:
-check_images_and_labels
-crop_dataset_to_minimum_size
-subdivide_dataset_to_patches
If you use this code, please cite the first SynthSeg paper:
https://github.com/BBillot/lab2im/blob/master/bibtex.bib
Copyright 2020 Benjamin Billot
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is
distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing permissions and limitations under the
License.
"""
# python imports
import os
import csv
import shutil
import numpy as np
import tensorflow as tf
import keras.layers as KL
from keras.models import Model
from scipy.ndimage.filters import convolve
from scipy.ndimage import label as scipy_label
from scipy.interpolate import RegularGridInterpolator
from scipy.ndimage.morphology import distance_transform_edt, binary_fill_holes
from scipy.ndimage import binary_dilation, binary_erosion, gaussian_filter
# project imports
from ext.lab2im import utils
from ext.lab2im.layers import GaussianBlur, ConvertLabels
from ext.lab2im.edit_tensors import blurring_sigma_for_downsampling
# ---------------------------------------------------- edit volume -----------------------------------------------------
def mask_volume(volume, mask=None, threshold=0.1, dilate=0, erode=0, fill_holes=False, masking_value=0,
return_mask=False, return_copy=True):
"""Mask a volume, either with a given mask, or by keeping only the values above a threshold.
:param volume: a numpy array, possibly with several channels
:param mask: (optional) a numpy array to mask volume with.
Mask doesn't have to be a 0/1 array, all strictly positive values of mask are considered for masking volume.
Mask should have the same size as volume. If volume has several channels, mask can either be uni- or multi-channel.
In the first case, the same mask is applied to all channels.
:param threshold: (optional) If mask is None, masking is performed by keeping thresholding the input.
:param dilate: (optional) number of voxels by which to dilate the provided or computed mask.
:param erode: (optional) number of voxels by which to erode the provided or computed mask.
:param fill_holes: (optional) whether to fill the holes in the provided or computed mask.
:param masking_value: (optional) masking value
:param return_mask: (optional) whether to return the applied mask
:param return_copy: (optional) whether to return the original volume or a copy. Default is copy.
:return: the masked volume, and the applied mask if return_mask is True.
"""
# get info
new_volume = volume.copy() if return_copy else volume
vol_shape = list(new_volume.shape)
n_dims, n_channels = utils.get_dims(vol_shape)
# get mask and erode/dilate it
if mask is None:
mask = new_volume >= threshold
else:
assert list(mask.shape[:n_dims]) == vol_shape[:n_dims], 'mask should have shape {0}, or {1}, had {2}'.format(
vol_shape[:n_dims], vol_shape[:n_dims] + [n_channels], list(mask.shape))
mask = mask > 0
if dilate > 0:
dilate_struct = utils.build_binary_structure(dilate, n_dims)
mask_to_apply = binary_dilation(mask, dilate_struct)
else:
mask_to_apply = mask
if erode > 0:
erode_struct = utils.build_binary_structure(erode, n_dims)
mask_to_apply = binary_erosion(mask_to_apply, erode_struct)
if fill_holes:
mask_to_apply = binary_fill_holes(mask_to_apply)
# replace values outside of mask by padding_char
if mask_to_apply.shape == new_volume.shape:
new_volume[np.logical_not(mask_to_apply)] = masking_value
else:
new_volume[np.stack([np.logical_not(mask_to_apply)] * n_channels, axis=-1)] = masking_value
if return_mask:
return new_volume, mask_to_apply
else:
return new_volume
def rescale_volume(volume, new_min=0, new_max=255, min_percentile=2, max_percentile=98, use_positive_only=False):
"""This function linearly rescales a volume between new_min and new_max.
:param volume: a numpy array
:param new_min: (optional) minimum value for the rescaled image.
:param new_max: (optional) maximum value for the rescaled image.
:param min_percentile: (optional) percentile for estimating robust minimum of volume (float in [0,...100]),
where 0 = np.min
:param max_percentile: (optional) percentile for estimating robust maximum of volume (float in [0,...100]),
where 100 = np.max
:param use_positive_only: (optional) whether to use only positive values when estimating the min and max percentile
:return: rescaled volume
"""
# select only positive intensities
new_volume = volume.copy()
intensities = new_volume[new_volume > 0] if use_positive_only else new_volume.flatten()
# define min and max intensities in original image for normalisation
robust_min = np.min(intensities) if min_percentile == 0 else np.percentile(intensities, min_percentile)
robust_max = np.max(intensities) if max_percentile == 100 else np.percentile(intensities, max_percentile)
# trim values outside range
new_volume = np.clip(new_volume, robust_min, robust_max)
# rescale image
if robust_min != robust_max:
return new_min + (new_volume - robust_min) / (robust_max - robust_min) * (new_max - new_min)
else: # avoid dividing by zero
return np.zeros_like(new_volume)
def crop_volume(volume, cropping_margin=None, cropping_shape=None, aff=None, return_crop_idx=False, mode='center'):
"""Crop volume by a given margin, or to a given shape.
:param volume: 2d or 3d numpy array (possibly with multiple channels)
:param cropping_margin: (optional) margin by which to crop the volume. The cropping margin is applied on both sides.
Can be an int, sequence or 1d numpy array of size n_dims. Should be given if cropping_shape is None.
:param cropping_shape: (optional) shape to which the volume will be cropped. Can be an int, sequence or 1d numpy
array of size n_dims. Should be given if cropping_margin is None.
:param aff: (optional) affine matrix of the input volume.
If not None, this function also returns an updated version of the affine matrix for the cropped volume.
:param return_crop_idx: (optional) whether to return the cropping indices used to crop the given volume.
:param mode: (optional) if cropping_shape is not None, whether to extract the centre of the image (mode='center'),
or to randomly crop the volume to the provided shape (mode='random'). Default is 'center'.
:return: cropped volume, corresponding affine matrix if aff is not None, and cropping indices if return_crop_idx is
True (in that order).
"""
assert (cropping_margin is not None) | (cropping_shape is not None), \
'cropping_margin or cropping_shape should be provided'
assert not ((cropping_margin is not None) & (cropping_shape is not None)), \
'only one of cropping_margin or cropping_shape should be provided'
# get info
new_volume = volume.copy()
vol_shape = new_volume.shape
n_dims, _ = utils.get_dims(vol_shape)
# find cropping indices
if cropping_margin is not None:
cropping_margin = utils.reformat_to_list(cropping_margin, length=n_dims)
do_cropping = np.array(vol_shape[:n_dims]) > 2 * np.array(cropping_margin)
min_crop_idx = [cropping_margin[i] if do_cropping[i] else 0 for i in range(n_dims)]
max_crop_idx = [vol_shape[i] - cropping_margin[i] if do_cropping[i] else vol_shape[i] for i in range(n_dims)]
else:
cropping_shape = utils.reformat_to_list(cropping_shape, length=n_dims)
if mode == 'center':
min_crop_idx = np.maximum([int((vol_shape[i] - cropping_shape[i]) / 2) for i in range(n_dims)], 0)
max_crop_idx = np.minimum([min_crop_idx[i] + cropping_shape[i] for i in range(n_dims)],
np.array(vol_shape)[:n_dims])
elif mode == 'random':
crop_max_val = np.maximum(np.array([vol_shape[i] - cropping_shape[i] for i in range(n_dims)]), 0)
min_crop_idx = np.random.randint(0, high=crop_max_val + 1)
max_crop_idx = np.minimum(min_crop_idx + np.array(cropping_shape), np.array(vol_shape)[:n_dims])
else:
raise ValueError('mode should be either "center" or "random", had %s' % mode)
crop_idx = np.concatenate([np.array(min_crop_idx), np.array(max_crop_idx)])
# crop volume
if n_dims == 2:
new_volume = new_volume[crop_idx[0]: crop_idx[2], crop_idx[1]: crop_idx[3], ...]
elif n_dims == 3:
new_volume = new_volume[crop_idx[0]: crop_idx[3], crop_idx[1]: crop_idx[4], crop_idx[2]: crop_idx[5], ...]
# sort outputs
output = [new_volume]
if aff is not None:
aff[0:3, -1] = aff[0:3, -1] + aff[:3, :3] @ np.array(min_crop_idx)
output.append(aff)
if return_crop_idx:
output.append(crop_idx)
return output[0] if len(output) == 1 else tuple(output)
def crop_volume_around_region(volume,
mask=None,
masking_labels=None,
threshold=0.1,
margin=0,
cropping_shape=None,
cropping_shape_div_by=None,
aff=None,
overflow='strict'):
"""Crop a volume around a specific region.
This region is defined by a mask obtained by either:
1) directly specifying it as input (see mask)
2) keeping a set of label values if the volume is a label map (see masking_labels).
3) thresholding the input volume (see threshold)
The cropping region is defined by the bounding box of the mask, which we can further modify by either:
1) extending it by a margin (see margin)
2) providing a specific cropping shape, in this case the cropping region will be centered around the bounding box
(see cropping_shape).
3) extending it to a shape that is divisible by a given number. Again, the cropping region will be centered around
the bounding box (see cropping_shape_div_by).
Finally, if the size of the cropping region has been modified, and that this modified size overflows out of the
image (e.g. because the center of the mask is close to the edge), we can either:
1) stick to the valid image space (the size of the modified cropping region won't be respected)
2) shift the cropping region so that it lies on the valid image space, and if it still overflows, then we restrict
to the valid image space.
3) pad the image with zeros, such that the cropping region is not ill-defined anymore.
3) shift the cropping region to the valida image space, and if it still overflows, then we pad with zeros.
:param volume: a 2d or 3d numpy array
:param mask: (optional) mask of region to crop around. Must be same size as volume. Can either be boolean or 0/1.
If no mask is given, it will be computed by either thresholding the input volume or using masking_labels.
:param masking_labels: (optional) if mask is None, and if the volume is a label map, it can be cropped around a
set of labels specified in masking_labels, which can either be a single int, a sequence or a 1d numpy array.
:param threshold: (optional) if mask amd masking_labels are None, lower bound to determine values to crop around.
:param margin: (optional) add margin around mask
:param cropping_shape: (optional) shape to which the input volumes must be cropped. Volumes are padded around the
centre of the above-defined mask is they are too small for the given shape. Can be an integer or sequence.
Cannot be given at the same time as margin or cropping_shape_div_by.
:param cropping_shape_div_by: (optional) makes sure the shape of the cropped region is divisible by the provided
number. If it is not, then we enlarge the cropping area. If the enlarged area is too big for the input volume, we
pad it with 0. Must be an integer. Cannot be given at the same time as margin or cropping_shape.
:param aff: (optional) if specified, this function returns an updated affine matrix of the volume after cropping.
:param overflow: (optional) how to proceed when the cropping region overflows outside the initial image space.
Can either be 'strict' (default), 'shift-strict', 'padding', 'shift-padding.
:return: the cropped volume, the cropping indices (in the order [lower_bound_dim_1, ..., upper_bound_dim_1, ...]),
and the updated affine matrix if aff is not None.
"""
assert not ((margin > 0) & (cropping_shape is not None)), "margin and cropping_shape can't be given together."
assert not ((margin > 0) & (cropping_shape_div_by is not None)), \
"margin and cropping_shape_div_by can't be given together."
assert not ((cropping_shape_div_by is not None) & (cropping_shape is not None)), \
"cropping_shape_div_by and cropping_shape can't be given together."
new_vol = volume.copy()
n_dims, n_channels = utils.get_dims(new_vol.shape)
vol_shape = np.array(new_vol.shape[:n_dims])
# mask ROIs for cropping
if mask is None:
if masking_labels is not None:
_, mask = mask_label_map(new_vol, masking_values=masking_labels, return_mask=True)
else:
mask = new_vol > threshold
# find cropping indices
if np.any(mask):
indices = np.nonzero(mask)
min_idx = np.array([np.min(idx) for idx in indices])
max_idx = np.array([np.max(idx) for idx in indices])
intermediate_vol_shape = max_idx - min_idx
if (margin == 0) & (cropping_shape is None) & (cropping_shape_div_by is None):
cropping_shape = intermediate_vol_shape
if margin:
cropping_shape = intermediate_vol_shape + 2 * margin
elif cropping_shape is not None:
cropping_shape = np.array(utils.reformat_to_list(cropping_shape, length=n_dims))
elif cropping_shape_div_by is not None:
cropping_shape = [utils.find_closest_number_divisible_by_m(s, cropping_shape_div_by, answer_type='higher')
for s in intermediate_vol_shape]
min_idx = min_idx - np.int32(np.ceil((cropping_shape - intermediate_vol_shape) / 2))
max_idx = max_idx + np.int32(np.floor((cropping_shape - intermediate_vol_shape) / 2))
min_overflow = np.abs(np.minimum(min_idx, 0))
max_overflow = np.maximum(max_idx - vol_shape, 0)
if 'strict' in overflow:
min_overflow = np.zeros_like(min_overflow)
max_overflow = np.zeros_like(min_overflow)
if overflow == 'shift-strict':
min_idx -= max_overflow
max_idx += min_overflow
if overflow == 'shift-padding':
for ii in range(n_dims):
# no need to do anything if both min/max_overflow are 0 (no padding/shifting required at all)
# or if both are positive, because in this case we don't shift at all and we pad directly
if (min_overflow[ii] > 0) & (max_overflow[ii] == 0):
max_idx_new = max_idx[ii] + min_overflow[ii]
if max_idx_new <= vol_shape[ii]:
max_idx[ii] = max_idx_new
min_overflow[ii] = 0
else:
min_overflow[ii] = min_overflow[ii] - (vol_shape[ii] - max_idx[ii])
max_idx[ii] = vol_shape[ii]
elif (min_overflow[ii] == 0) & (max_overflow[ii] > 0):
min_idx_new = min_idx[ii] - max_overflow[ii]
if min_idx_new >= 0:
min_idx[ii] = min_idx_new
max_overflow[ii] = 0
else:
max_overflow[ii] = max_overflow[ii] - min_idx[ii]
min_idx[ii] = 0
# crop volume if necessary
min_idx = np.maximum(min_idx, 0)
max_idx = np.minimum(max_idx, vol_shape)
cropping = np.concatenate([min_idx, max_idx])
if np.any(cropping[:3] > 0) or np.any(cropping[3:] != vol_shape):
if n_dims == 3:
new_vol = new_vol[cropping[0]:cropping[3], cropping[1]:cropping[4], cropping[2]:cropping[5], ...]
elif n_dims == 2:
new_vol = new_vol[cropping[0]:cropping[2], cropping[1]:cropping[3], ...]
else:
raise ValueError('cannot crop volumes with more than 3 dimensions')
# pad volume if necessary
if np.any(min_overflow > 0) | np.any(max_overflow > 0):
pad_margins = tuple([(min_overflow[i], max_overflow[i]) for i in range(n_dims)])
pad_margins = tuple(list(pad_margins) + [(0, 0)]) if n_channels > 1 else pad_margins
new_vol = np.pad(new_vol, pad_margins, mode='constant', constant_values=0)
# if there's nothing to crop around, we return the input as is
else:
min_idx = min_overflow = np.zeros(3)
cropping = None
# return results
if aff is not None:
if n_dims == 2:
min_idx = np.append(min_idx, 0)
min_overflow = np.append(min_overflow, 0)
aff[0:3, -1] = aff[0:3, -1] + aff[:3, :3] @ min_idx
aff[:-1, -1] = aff[:-1, -1] - aff[:-1, :-1] @ min_overflow
return new_vol, cropping, aff
else:
return new_vol, cropping
def crop_volume_with_idx(volume, crop_idx, aff=None, n_dims=None, return_copy=True):
"""Crop a volume with given indices.
:param volume: a 2d or 3d numpy array
:param crop_idx: cropping indices, in the order [lower_bound_dim_1, ..., upper_bound_dim_1, ...].
Can be a list or a 1d numpy array.
:param aff: (optional) if aff is specified, this function returns an updated affine matrix of the volume after
cropping.
:param n_dims: (optional) number of dimensions (excluding channels) of the volume. If not provided, n_dims will be
inferred from the input volume.
:param return_copy: (optional) whether to return the original volume or a copy. Default is copy.
:return: the cropped volume, and the updated affine matrix if aff is not None.
"""
# get info
new_volume = volume.copy() if return_copy else volume
n_dims = int(np.array(crop_idx).shape[0] / 2) if n_dims is None else n_dims
# crop image
if n_dims == 2:
new_volume = new_volume[crop_idx[0]:crop_idx[2], crop_idx[1]:crop_idx[3], ...]
elif n_dims == 3:
new_volume = new_volume[crop_idx[0]:crop_idx[3], crop_idx[1]:crop_idx[4], crop_idx[2]:crop_idx[5], ...]
else:
raise Exception('cannot crop volumes with more than 3 dimensions')
if aff is not None:
aff[0:3, -1] = aff[0:3, -1] + aff[:3, :3] @ crop_idx[:3]
return new_volume, aff
else:
return new_volume
def pad_volume(volume, padding_shape, padding_value=0, aff=None, return_pad_idx=False):
"""Pad volume to a given shape
:param volume: volume to be padded
:param padding_shape: shape to pad volume to. Can be a number, a sequence or a 1d numpy array.
:param padding_value: (optional) value used for padding
:param aff: (optional) affine matrix of the volume
:param return_pad_idx: (optional) the pad_idx corresponds to the indices where we should crop the resulting
padded image (ie the output of this function) to go back to the original volume (ie the input of this function).
:return: padded volume, and updated affine matrix if aff is not None.
"""
# get info
new_volume = volume.copy()
vol_shape = new_volume.shape
n_dims, n_channels = utils.get_dims(vol_shape)
padding_shape = utils.reformat_to_list(padding_shape, length=n_dims, dtype='int')
# check if need to pad
if np.any(np.array(padding_shape, dtype='int32') > np.array(vol_shape[:n_dims], dtype='int32')):
# get padding margins
min_margins = np.maximum(np.int32(np.floor((np.array(padding_shape) - np.array(vol_shape)[:n_dims]) / 2)), 0)
max_margins = np.maximum(np.int32(np.ceil((np.array(padding_shape) - np.array(vol_shape)[:n_dims]) / 2)), 0)
pad_idx = np.concatenate([min_margins, min_margins + np.array(vol_shape[:n_dims])])
pad_margins = tuple([(min_margins[i], max_margins[i]) for i in range(n_dims)])
if n_channels > 1:
pad_margins = tuple(list(pad_margins) + [(0, 0)])
# pad volume
new_volume = np.pad(new_volume, pad_margins, mode='constant', constant_values=padding_value)
if aff is not None:
if n_dims == 2:
min_margins = np.append(min_margins, 0)
aff[:-1, -1] = aff[:-1, -1] - aff[:-1, :-1] @ min_margins
else:
pad_idx = np.concatenate([np.array([0] * n_dims), np.array(vol_shape[:n_dims])])
# sort outputs
output = [new_volume]
if aff is not None:
output.append(aff)
if return_pad_idx:
output.append(pad_idx)
return output[0] if len(output) == 1 else tuple(output)
def flip_volume(volume, axis=None, direction=None, aff=None, return_copy=True):
"""Flip volume along a specified axis.
If unknown, this axis can be inferred from an affine matrix with a specified anatomical direction.
:param volume: a numpy array
:param axis: (optional) axis along which to flip the volume. Can either be an int or a tuple.
:param direction: (optional) if axis is None, the volume can be flipped along an anatomical direction:
'rl' (right/left), 'ap' anterior/posterior), 'si' (superior/inferior).
:param aff: (optional) please provide an affine matrix if direction is not None
:param return_copy: (optional) whether to return the original volume or a copy. Default is copy.
:return: flipped volume
"""
new_volume = volume.copy() if return_copy else volume
assert (axis is not None) | ((aff is not None) & (direction is not None)), \
'please provide either axis, or an affine matrix with a direction'
# get flipping axis from aff if axis not provided
if (axis is None) & (aff is not None):
volume_axes = get_ras_axes(aff)
if direction == 'rl':
axis = volume_axes[0]
elif direction == 'ap':
axis = volume_axes[1]
elif direction == 'si':
axis = volume_axes[2]
else:
raise ValueError("direction should be 'rl', 'ap', or 'si', had %s" % direction)
# flip volume
return np.flip(new_volume, axis=axis)
def resample_volume(volume, aff, new_vox_size, interpolation='linear', blur=True):
"""This function resizes the voxels of a volume to a new provided size, while adjusting the header to keep the RAS
:param volume: a numpy array
:param aff: affine matrix of the volume
:param new_vox_size: new voxel size (3 - element numpy vector) in mm
:param interpolation: (optional) type of interpolation. Can be 'linear' or 'nearest'. Default is 'linear'.
:param blur: (optional) whether to blur before resampling to avoid aliasing effects.
Only used if the input volume is downsampled. Default is True.
:return: new volume and affine matrix
"""
pixdim = np.sqrt(np.sum(aff * aff, axis=0))[:-1]
new_vox_size = np.array(new_vox_size)
factor = pixdim / new_vox_size
sigmas = 0.25 / factor
sigmas[factor > 1] = 0 # don't blur if upsampling
volume_filt = gaussian_filter(volume, sigmas) if blur else volume
# volume2 = zoom(volume_filt, factor, order=1, mode='reflect', prefilter=False)
x = np.arange(0, volume_filt.shape[0])
y = np.arange(0, volume_filt.shape[1])
z = np.arange(0, volume_filt.shape[2])
my_interpolating_function = RegularGridInterpolator((x, y, z), volume_filt, method=interpolation)
start = - (factor - 1) / (2 * factor)
step = 1.0 / factor
stop = start + step * np.ceil(volume_filt.shape * factor)
xi = np.arange(start=start[0], stop=stop[0], step=step[0])
yi = np.arange(start=start[1], stop=stop[1], step=step[1])
zi = np.arange(start=start[2], stop=stop[2], step=step[2])
xi[xi < 0] = 0
yi[yi < 0] = 0
zi[zi < 0] = 0
xi[xi > (volume_filt.shape[0] - 1)] = volume_filt.shape[0] - 1
yi[yi > (volume_filt.shape[1] - 1)] = volume_filt.shape[1] - 1
zi[zi > (volume_filt.shape[2] - 1)] = volume_filt.shape[2] - 1
xig, yig, zig = np.meshgrid(xi, yi, zi, indexing='ij', sparse=True)
volume2 = my_interpolating_function((xig, yig, zig))
aff2 = aff.copy()
for c in range(3):
aff2[:-1, c] = aff2[:-1, c] / factor[c]
aff2[:-1, -1] = aff2[:-1, -1] - np.matmul(aff2[:-1, :-1], 0.5 * (factor - 1))
return volume2, aff2
def resample_volume_like(vol_ref, aff_ref, vol_flo, aff_flo, interpolation='linear'):
"""This function reslices a floating image to the space of a reference image
:param vol_ref: a numpy array with the reference volume
:param aff_ref: affine matrix of the reference volume
:param vol_flo: a numpy array with the floating volume
:param aff_flo: affine matrix of the floating volume
:param interpolation: (optional) type of interpolation. Can be 'linear' or 'nearest'. Default is 'linear'.
:return: resliced volume
"""
T = np.matmul(np.linalg.inv(aff_flo), aff_ref)
xf = np.arange(0, vol_flo.shape[0])
yf = np.arange(0, vol_flo.shape[1])
zf = np.arange(0, vol_flo.shape[2])
my_interpolating_function = RegularGridInterpolator((xf, yf, zf), vol_flo, bounds_error=False, fill_value=0.0,
method=interpolation)
xr = np.arange(0, vol_ref.shape[0])
yr = np.arange(0, vol_ref.shape[1])
zr = np.arange(0, vol_ref.shape[2])
xrg, yrg, zrg = np.meshgrid(xr, yr, zr, indexing='ij', sparse=False)
n = xrg.size
xrg = xrg.reshape([n])
yrg = yrg.reshape([n])
zrg = zrg.reshape([n])
bottom = np.ones_like(xrg)
coords = np.stack([xrg, yrg, zrg, bottom])
coords_new = np.matmul(T, coords)[:-1, :]
result = my_interpolating_function((coords_new[0, :], coords_new[1, :], coords_new[2, :]))
return result.reshape(vol_ref.shape)
def get_ras_axes(aff, n_dims=3):
"""This function finds the RAS axes corresponding to each dimension of a volume, based on its affine matrix.
:param aff: affine matrix Can be a 2d numpy array of size n_dims*n_dims, n_dims+1*n_dims+1, or n_dims*n_dims+1.
:param n_dims: number of dimensions (excluding channels) of the volume corresponding to the provided affine matrix.
:return: two numpy 1d arrays of length n_dims, one with the axes corresponding to RAS orientations,
and one with their corresponding direction.
"""
aff_inverted = np.linalg.inv(aff)
img_ras_axes = np.argmax(np.absolute(aff_inverted[0:n_dims, 0:n_dims]), axis=0)
for i in range(n_dims):
if i not in img_ras_axes:
unique, counts = np.unique(img_ras_axes, return_counts=True)
incorrect_value = unique[np.argmax(counts)]
img_ras_axes[np.where(img_ras_axes == incorrect_value)[0][-1]] = i
return img_ras_axes
def align_volume_to_ref(volume, aff, aff_ref=None, return_aff=False, n_dims=None, return_copy=True):
"""This function aligns a volume to a reference orientation (axis and direction) specified by an affine matrix.
:param volume: a numpy array
:param aff: affine matrix of the floating volume
:param aff_ref: (optional) affine matrix of the target orientation. Default is identity matrix.
:param return_aff: (optional) whether to return the affine matrix of the aligned volume
:param n_dims: (optional) number of dimensions (excluding channels) of the volume. If not provided, n_dims will be
inferred from the input volume.
:param return_copy: (optional) whether to return the original volume or a copy. Default is copy.
:return: aligned volume, with corresponding affine matrix if return_aff is True.
"""
# work on copy
new_volume = volume.copy() if return_copy else volume
aff_flo = aff.copy()
# default value for aff_ref
if aff_ref is None:
aff_ref = np.eye(4)
# extract ras axes
if n_dims is None:
n_dims, _ = utils.get_dims(new_volume.shape)
ras_axes_ref = get_ras_axes(aff_ref, n_dims=n_dims)
ras_axes_flo = get_ras_axes(aff_flo, n_dims=n_dims)
# align axes
aff_flo[:, ras_axes_ref] = aff_flo[:, ras_axes_flo]
for i in range(n_dims):
if ras_axes_flo[i] != ras_axes_ref[i]:
new_volume = np.swapaxes(new_volume, ras_axes_flo[i], ras_axes_ref[i])
swapped_axis_idx = np.where(ras_axes_flo == ras_axes_ref[i])
ras_axes_flo[swapped_axis_idx], ras_axes_flo[i] = ras_axes_flo[i], ras_axes_flo[swapped_axis_idx]
# align directions
dot_products = np.sum(aff_flo[:3, :3] * aff_ref[:3, :3], axis=0)
for i in range(n_dims):
if dot_products[i] < 0:
new_volume = np.flip(new_volume, axis=i)
aff_flo[:, i] = - aff_flo[:, i]
aff_flo[:3, 3] = aff_flo[:3, 3] - aff_flo[:3, i] * (new_volume.shape[i] - 1)
if return_aff:
return new_volume, aff_flo
else:
return new_volume
def blur_volume(volume, sigma, mask=None):
"""Blur volume with gaussian masks of given sigma.
:param volume: 2d or 3d numpy array
:param sigma: standard deviation of the gaussian kernels. Can be a number, a sequence or a 1d numpy array
:param mask: (optional) numpy array of the same shape as volume to correct for edge blurring effects.
Mask can be a boolean or numerical array. In the latter, the mask is computed by keeping all values above zero.
:return: blurred volume
"""
# initialisation
new_volume = volume.copy()
n_dims, _ = utils.get_dims(new_volume.shape)
sigma = utils.reformat_to_list(sigma, length=n_dims, dtype='float')
# blur image
new_volume = gaussian_filter(new_volume, sigma=sigma, mode='nearest') # nearest refers to edge padding
# correct edge effect if mask is not None
if mask is not None:
assert new_volume.shape == mask.shape, 'volume and mask should have the same dimensions: ' \
'got {0} and {1}'.format(new_volume.shape, mask.shape)
mask = (mask > 0) * 1.0
blurred_mask = gaussian_filter(mask, sigma=sigma, mode='nearest')
new_volume = new_volume / (blurred_mask + 1e-6)
new_volume[mask == 0] = 0
return new_volume
# --------------------------------------------------- edit label map ---------------------------------------------------
def correct_label_map(labels, list_incorrect_labels, list_correct_labels=None, use_nearest_label=False,
remove_zero=False, smooth=False):
"""This function corrects specified label values in a label map by either a list of given values, or by the nearest
label.
:param labels: a 2d or 3d label map
:param list_incorrect_labels: list of all label values to correct (eg [1, 2, 3]). Can also be a path to such a list.
:param list_correct_labels: (optional) list of correct label values to replace the incorrect ones.
Correct values must have the same order as their corresponding value in list_incorrect_labels.
When several correct values are possible for the same incorrect value, the nearest correct value will be selected at
each voxel to correct. In that case, the different correct values must be specified inside a list within
list_correct_labels (e.g. [10, 20, 30, [40, 50]).
:param use_nearest_label: (optional) whether to correct the incorrect label values with the nearest labels.
:param remove_zero: (optional) if use_nearest_label is True, set to True not to consider zero among the potential
candidates for the nearest neighbour. -1 will be returned when no solution are possible.
:param smooth: (optional) whether to smooth the corrected label map
:return: corrected label map
"""
assert (list_correct_labels is not None) | use_nearest_label, \
'please provide a list of correct labels, or set use_nearest_label to True.'
assert (list_correct_labels is None) | (not use_nearest_label), \
'cannot provide a list of correct values and set use_nearest_label to True'
# initialisation
new_labels = labels.copy()
list_incorrect_labels = utils.reformat_to_list(utils.load_array_if_path(list_incorrect_labels))
volume_labels = np.unique(labels)
n_dims, _ = utils.get_dims(labels.shape)
# use list of correct values
if list_correct_labels is not None:
list_correct_labels = utils.reformat_to_list(utils.load_array_if_path(list_correct_labels))
# loop over label values
for incorrect_label, correct_label in zip(list_incorrect_labels, list_correct_labels):
if incorrect_label in volume_labels:
# only one possible value to replace with
if isinstance(correct_label, (int, float, np.int64, np.int32, np.int16, np.int8)):
incorrect_voxels = np.where(labels == incorrect_label)
new_labels[incorrect_voxels] = correct_label
# several possibilities
elif isinstance(correct_label, (tuple, list)):
# make sure at least one correct label is present
if not any([lab in volume_labels for lab in correct_label]):
print('no correct values found in volume, please adjust: '
'incorrect: {}, correct: {}'.format(incorrect_label, correct_label))
# crop around incorrect label until we find incorrect labels
correct_label_not_found = True
margin_mult = 1
tmp_labels = None
crop = None
while correct_label_not_found:
tmp_labels, crop = crop_volume_around_region(labels,
masking_labels=incorrect_label,
margin=10 * margin_mult)
correct_label_not_found = not any([lab in np.unique(tmp_labels) for lab in correct_label])
margin_mult += 1
# calculate distance maps for all new label candidates
incorrect_voxels = np.where(tmp_labels == incorrect_label)
distance_map_list = [distance_transform_edt(tmp_labels != lab) for lab in correct_label]
distances_correct = np.stack([dist[incorrect_voxels] for dist in distance_map_list])
# select nearest values and use them to correct label map
idx_correct_lab = np.argmin(distances_correct, axis=0)
incorrect_voxels = tuple([incorrect_voxels[i] + crop[i] for i in range(n_dims)])
new_labels[incorrect_voxels] = np.array(correct_label)[idx_correct_lab]
# use nearest label
else:
# loop over label values
for incorrect_label in list_incorrect_labels:
if incorrect_label in volume_labels:
# loop around regions
components, n_components = scipy_label(labels == incorrect_label)
loop_info = utils.LoopInfo(n_components + 1, 100, 'correcting')
for i in range(1, n_components + 1):
loop_info.update(i)
# crop each region
_, crop = crop_volume_around_region(components, masking_labels=i, margin=1)
tmp_labels = crop_volume_with_idx(labels, crop)
tmp_new_labels = crop_volume_with_idx(new_labels, crop)
# list all possible correct labels
correct_labels = np.unique(tmp_labels)
for il in list_incorrect_labels:
correct_labels = np.delete(correct_labels, np.where(correct_labels == il))
if remove_zero:
correct_labels = np.delete(correct_labels, np.where(correct_labels == 0))
# replace incorrect voxels by new value
incorrect_voxels = np.where(tmp_labels == incorrect_label)
if len(correct_labels) == 0:
tmp_new_labels[incorrect_voxels] = -1
else:
if len(correct_labels) == 1:
idx_correct_lab = np.zeros(len(incorrect_voxels[0]), dtype='int32')
else:
distance_map_list = [distance_transform_edt(tmp_labels != lab) for lab in correct_labels]
distances_correct = np.stack([dist[incorrect_voxels] for dist in distance_map_list])
idx_correct_lab = np.argmin(distances_correct, axis=0)
tmp_new_labels[incorrect_voxels] = np.array(correct_labels)[idx_correct_lab]
# paste back
if n_dims == 2:
new_labels[crop[0]:crop[2], crop[1]:crop[3], ...] = tmp_new_labels
else:
new_labels[crop[0]:crop[3], crop[1]:crop[4], crop[2]:crop[5], ...] = tmp_new_labels
# smoothing
if smooth:
kernel = np.ones(tuple([3] * n_dims))
new_labels = smooth_label_map(new_labels, kernel)
return new_labels
def mask_label_map(labels, masking_values, masking_value=0, return_mask=False):
"""
This function masks a label map around a list of specified values.
:param labels: input label map
:param masking_values: list of values to mask around
:param masking_value: (optional) value to mask the label map with
:param return_mask: (optional) whether to return the applied mask
:return: the masked label map, and the applied mask if return_mask is True.
"""
# build mask and mask labels
mask = np.zeros(labels.shape, dtype=bool)
masked_labels = labels.copy()
for value in utils.reformat_to_list(masking_values):
mask = mask | (labels == value)
masked_labels[np.logical_not(mask)] = masking_value
if return_mask:
mask = mask * 1
return masked_labels, mask
else:
return masked_labels
def smooth_label_map(labels, kernel, labels_list=None, print_progress=0):
"""This function smooth an input label map by replacing each voxel by the value of its most numerous neighbour.
:param labels: input label map
:param kernel: kernel when counting neighbours. Must contain only zeros or ones.
:param labels_list: list of label values to smooth. Defaults is None, where all labels are smoothed.
:param print_progress: (optional) If not 0, interval at which to print the number of processed labels.
:return: smoothed label map
"""
# get info
labels_shape = labels.shape
unique_labels = np.unique(labels).astype('int32')
if labels_list is None:
labels_list = unique_labels
new_labels = mask_new_labels = None
else:
labels_to_keep = [lab for lab in unique_labels if lab not in labels_list]
new_labels, mask_new_labels = mask_label_map(labels, labels_to_keep, return_mask=True)
# loop through label values
count = np.zeros(labels_shape)
labels_smoothed = np.zeros(labels_shape, dtype='int')
loop_info = utils.LoopInfo(len(labels_list), print_progress, 'smoothing')
for la, label in enumerate(labels_list):
if print_progress:
loop_info.update(la)
# count neighbours with same value
mask = (labels == label) * 1
n_neighbours = convolve(mask, kernel)
# update label map and maximum neighbour counts
idx = n_neighbours > count
count[idx] = n_neighbours[idx]
labels_smoothed[idx] = label
labels_smoothed = labels_smoothed.astype('int32')
if new_labels is None:
new_labels = labels_smoothed
else:
new_labels = np.where(mask_new_labels, new_labels, labels_smoothed)
return new_labels
def erode_label_map(labels, labels_to_erode, erosion_factors=1., gpu=False, model=None, return_model=False):
"""Erode a given set of label values within a label map.
:param labels: a 2d or 3d label map
:param labels_to_erode: list of label values to erode
:param erosion_factors: (optional) list of erosion factors to use for each label. If values are integers, normal
erosion applies. If float, we first 1) blur a mask of the corresponding label value, and 2) use the erosion factor
as a threshold in the blurred mask.
If erosion_factors is a single value, the same factor will be applied to all labels.
:param gpu: (optional) whether to use a fast gpu model for blurring (if erosion factors are floats)
:param model: (optional) gpu model for blurring masks (if erosion factors are floats)
:param return_model: (optional) whether to return the gpu blurring model
:return: eroded label map, and gpu blurring model is return_model is True.
"""
# reformat labels_to_erode and erode
new_labels = labels.copy()
labels_to_erode = utils.reformat_to_list(labels_to_erode)
erosion_factors = utils.reformat_to_list(erosion_factors, length=len(labels_to_erode))
labels_shape = list(new_labels.shape)
n_dims, _ = utils.get_dims(labels_shape)
# loop over labels to erode
for label_to_erode, erosion_factor in zip(labels_to_erode, erosion_factors):
assert erosion_factor > 0, 'all erosion factors should be strictly positive, had {}'.format(erosion_factor)
# get mask of current label value
mask = (new_labels == label_to_erode)
# erode as usual if erosion factor is int
if int(erosion_factor) == erosion_factor:
erode_struct = utils.build_binary_structure(int(erosion_factor), n_dims)
eroded_mask = binary_erosion(mask, erode_struct)
# blur mask and use erosion factor as a threshold if float
else:
if gpu:
if model is None:
mask_in = KL.Input(shape=labels_shape + [1], dtype='float32')
blurred_mask = GaussianBlur([1] * 3)(mask_in)
model = Model(inputs=mask_in, outputs=blurred_mask)
eroded_mask = model.predict(utils.add_axis(np.float32(mask), axis=[0, -1]))
else:
eroded_mask = blur_volume(np.array(mask, dtype='float32'), 1)
eroded_mask = np.squeeze(eroded_mask) > erosion_factor
# crop label map and mask around values to change
mask = mask & np.logical_not(eroded_mask)
cropped_lab_mask, cropping = crop_volume_around_region(mask, margin=3)
cropped_labels = crop_volume_with_idx(new_labels, cropping)
# calculate distance maps for all labels in cropped_labels
labels_list = np.unique(cropped_labels)
labels_list = labels_list[labels_list != label_to_erode]
list_dist_maps = [distance_transform_edt(np.logical_not(cropped_labels == la)) for la in labels_list]
candidate_distances = np.stack([dist[cropped_lab_mask] for dist in list_dist_maps])
# select nearest value and put cropped labels back to full label map
idx_correct_lab = np.argmin(candidate_distances, axis=0)
cropped_labels[cropped_lab_mask] = np.array(labels_list)[idx_correct_lab]
if n_dims == 2:
new_labels[cropping[0]:cropping[2], cropping[1]:cropping[3], ...] = cropped_labels
elif n_dims == 3:
new_labels[cropping[0]:cropping[3], cropping[1]:cropping[4], cropping[2]:cropping[5], ...] = cropped_labels
if return_model:
return new_labels, model
else:
return new_labels
def get_largest_connected_component(mask, structure=None):
"""Function to get the largest connected component for a given input.
:param mask: a 2d or 3d label map of boolean type.
:param structure: numpy array defining the connectivity.
"""
components, n_components = scipy_label(mask, structure)
return components == np.argmax(np.bincount(components.flat)[1:]) + 1 if n_components > 0 else mask.copy()
def compute_hard_volumes(labels, voxel_volume=1., label_list=None, skip_background=True):
"""Compute hard volumes in a label map.
:param labels: a label map
:param voxel_volume: (optional) volume of voxel. Default is 1 (i.e. returned volumes are voxel counts).
:param label_list: (optional) list of labels to compute volumes for. Can be an int, a sequence, or a numpy array.
If None, the volumes of all label values are computed.
:param skip_background: (optional) whether to skip computing the volume of the background.
If label_list is None, this assumes background value is 0.
If label_list is not None, this assumes the background is the first value in label list.
:return: numpy 1d vector with the volumes of each structure
"""
# initialisation
subject_label_list = utils.reformat_to_list(np.unique(labels), dtype='int')
if label_list is None:
label_list = subject_label_list
else:
label_list = utils.reformat_to_list(label_list)
if skip_background:
label_list = label_list[1:]
volumes = np.zeros(len(label_list))
# loop over label values
for idx, label in enumerate(label_list):
if label in subject_label_list:
mask = (labels == label) * 1
volumes[idx] = np.sum(mask)
else:
volumes[idx] = 0
return volumes * voxel_volume
def compute_distance_map(labels, masking_labels=None, crop_margin=None):
"""Compute distance map for a given list of label values in a label map.
:param labels: a label map
:param masking_labels: (optional) list of label values to mask the label map with. The distances will be computed
for these labels only. Default is None, where all positive values are considered.
:param crop_margin: (optional) margin with which to crop the input label maps around the labels for which we
want to compute the distance maps.
:return: a distance map with positive values inside the considered regions, and negative values outside."""
n_dims, _ = utils.get_dims(labels.shape)
# crop label map if necessary
if crop_margin is not None:
tmp_labels, crop_idx = crop_volume_around_region(labels, margin=crop_margin)
else:
tmp_labels = labels
crop_idx = None
# mask label map around specify values
if masking_labels is not None:
masking_labels = utils.reformat_to_list(masking_labels)
mask = np.zeros(tmp_labels.shape, dtype='bool')
for masking_label in masking_labels:
mask = mask | tmp_labels == masking_label
else:
mask = tmp_labels > 0
not_mask = np.logical_not(mask)
# compute distances
dist_in = distance_transform_edt(mask)
dist_in = np.where(mask, dist_in - 0.5, dist_in)
dist_out = - distance_transform_edt(not_mask)
dist_out = np.where(not_mask, dist_out + 0.5, dist_out)
tmp_dist = dist_in + dist_out
# put back in original matrix if we cropped
if crop_idx is not None:
dist = np.min(tmp_dist) * np.ones(labels.shape, dtype='float32')
if n_dims == 3:
dist[crop_idx[0]:crop_idx[3], crop_idx[1]:crop_idx[4], crop_idx[2]:crop_idx[5], ...] = tmp_dist
elif n_dims == 2:
dist[crop_idx[0]:crop_idx[2], crop_idx[1]:crop_idx[3], ...] = tmp_dist
else:
dist = tmp_dist
return dist
# ------------------------------------------------- edit volumes in dir ------------------------------------------------
def mask_images_in_dir(image_dir, result_dir, mask_dir=None, threshold=0.1, dilate=0, erode=0, fill_holes=False,
masking_value=0, write_mask=False, mask_result_dir=None, recompute=True):
"""Mask all volumes in a folder, either with masks in a specified folder, or by keeping only the intensity values
above a specified threshold.
:param image_dir: path of directory with images to mask
:param result_dir: path of directory where masked images will be writen
:param mask_dir: (optional) path of directory containing masks. Masks are matched to images by sorting order.
Mask volumes don't have to be boolean or 0/1 arrays as all strictly positive values are used to build the masks.
Masks should have the same size as images. If images are multi-channel, masks can either be uni- or multi-channel.
In the first case, the same mask is applied to all channels.
:param threshold: (optional) If mask is None, masking is performed by keeping thresholding the input.
:param dilate: (optional) number of voxels by which to dilate the provided or computed masks.
:param erode: (optional) number of voxels by which to erode the provided or computed masks.
:param fill_holes: (optional) whether to fill the holes in the provided or computed masks.
:param masking_value: (optional) masking value
:param write_mask: (optional) whether to write the applied masks
:param mask_result_dir: (optional) path of resulting masks, if write_mask is True
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
if mask_result_dir is not None:
utils.mkdir(mask_result_dir)
# get path masks if necessary
path_images = utils.list_images_in_folder(image_dir)
if mask_dir is not None:
path_masks = utils.list_images_in_folder(mask_dir)
else:
path_masks = [None] * len(path_images)
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'masking', True)
for idx, (path_image, path_mask) in enumerate(zip(path_images, path_masks)):
loop_info.update(idx)
# mask images
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
if path_mask is not None:
mask = utils.load_volume(path_mask)
else:
mask = None
im = mask_volume(im, mask, threshold, dilate, erode, fill_holes, masking_value, write_mask)
# write mask if necessary
if write_mask:
assert mask_result_dir is not None, 'if write_mask is True, mask_result_dir has to be specified as well'
mask_result_path = os.path.join(mask_result_dir, os.path.basename(path_image))
utils.save_volume(im[1], aff, h, mask_result_path)
utils.save_volume(im[0], aff, h, path_result)
else:
utils.save_volume(im, aff, h, path_result)
def rescale_images_in_dir(image_dir, result_dir,
new_min=0, new_max=255,
min_percentile=2, max_percentile=98, use_positive_only=True,
recompute=True):
"""This function linearly rescales all volumes in image_dir between new_min and new_max.
:param image_dir: path of directory with images to rescale
:param result_dir: path of directory where rescaled images will be writen
:param new_min: (optional) minimum value for the rescaled images.
:param new_max: (optional) maximum value for the rescaled images.
:param min_percentile: (optional) percentile for estimating robust minimum of volume (float in [0,...100]),
where 0 = np.min
:param max_percentile: (optional) percentile for estimating robust maximum of volume (float in [0,...100]),
where 100 = np.max
:param use_positive_only: (optional) whether to use only positive values when estimating the min and max percentile
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# loop over images
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'rescaling', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
im = rescale_volume(im, new_min, new_max, min_percentile, max_percentile, use_positive_only)
utils.save_volume(im, aff, h, path_result)
def crop_images_in_dir(image_dir, result_dir, cropping_margin=None, cropping_shape=None, recompute=True):
"""Crop all volumes in a folder by a given margin, or to a given shape.
:param image_dir: path of directory with images to rescale
:param result_dir: path of directory where cropped images will be writen
:param cropping_margin: (optional) margin by which to crop the volume.
Can be an int, a sequence or a 1d numpy array. Should be given if cropping_shape is None.
:param cropping_shape: (optional) shape to which the volume will be cropped.
Can be an int, a sequence or a 1d numpy array. Should be given if cropping_margin is None.
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# loop over images and masks
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'cropping', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
# crop image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
volume, aff, h = utils.load_volume(path_image, im_only=False)
volume, aff = crop_volume(volume, cropping_margin, cropping_shape, aff)
utils.save_volume(volume, aff, h, path_result)
def crop_images_around_region_in_dir(image_dir,
result_dir,
mask_dir=None,
threshold=0.1,
masking_labels=None,
crop_margin=5,
recompute=True):
"""Crop all volumes in a folder around a region, which is defined for each volume by a mask obtained by either
1) directly providing it as input
2) thresholding the input volume
3) keeping a set of label values if the volume is a label map.
:param image_dir: path of directory with images to crop
:param result_dir: path of directory where cropped images will be writen
:param mask_dir: (optional) path of directory of input masks
:param threshold: (optional) lower bound to determine values to crop around
:param masking_labels: (optional) if the volume is a label map, it can be cropped around a given set of labels by
specifying them in masking_labels, which can either be a single int, a list or a 1d numpy array.
:param crop_margin: (optional) cropping margin
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# list volumes and masks
path_images = utils.list_images_in_folder(image_dir)
if mask_dir is not None:
path_masks = utils.list_images_in_folder(mask_dir)
else:
path_masks = [None] * len(path_images)
# loop over images and masks
loop_info = utils.LoopInfo(len(path_images), 10, 'cropping', True)
for idx, (path_image, path_mask) in enumerate(zip(path_images, path_masks)):
loop_info.update(idx)
# crop image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
volume, aff, h = utils.load_volume(path_image, im_only=True)
if path_mask is not None:
mask = utils.load_volume(path_mask)
else:
mask = None
volume, cropping, aff = crop_volume_around_region(volume, mask, threshold, masking_labels, crop_margin, aff)
utils.save_volume(volume, aff, h, path_result)
def pad_images_in_dir(image_dir, result_dir, max_shape=None, padding_value=0, recompute=True):
"""Pads all the volumes in a folder to the same shape (either provided or computed).
:param image_dir: path of directory with images to pad
:param result_dir: path of directory where padded images will be writen
:param max_shape: (optional) shape to pad the volumes to. Can be an int, a sequence or a 1d numpy array.
If None, volumes will be padded to the shape of the biggest volume in image_dir.
:param padding_value: (optional) value to pad the volumes with.
:param recompute: (optional) whether to recompute result files even if they already exist
:return: shape of the padded volumes.
"""
# create result dir
utils.mkdir(result_dir)
# list labels
path_images = utils.list_images_in_folder(image_dir)
# get maximum shape
if max_shape is None:
max_shape, aff, _, _, h, _ = utils.get_volume_info(path_images[0])
for path_image in path_images[1:]:
image_shape, aff, _, _, h, _ = utils.get_volume_info(path_image)
max_shape = tuple(np.maximum(np.asarray(max_shape), np.asarray(image_shape)))
max_shape = np.array(max_shape)
# loop over label maps
loop_info = utils.LoopInfo(len(path_images), 10, 'padding', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
# pad map
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
im, aff = pad_volume(im, max_shape, padding_value, aff)
utils.save_volume(im, aff, h, path_result)
return max_shape
def flip_images_in_dir(image_dir, result_dir, axis=None, direction=None, recompute=True):
"""Flip all images in a directory along a specified axis.
If unknown, this axis can be replaced by an anatomical direction.
:param image_dir: path of directory with images to flip
:param result_dir: path of directory where flipped images will be writen
:param axis: (optional) axis along which to flip the volume
:param direction: (optional) if axis is None, the volume can be flipped along an anatomical direction:
'rl' (right/left), 'ap' (anterior/posterior), 'si' (superior/inferior).
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# loop over images
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'flipping', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
# flip image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
im = flip_volume(im, axis=axis, direction=direction, aff=aff)
utils.save_volume(im, aff, h, path_result)
def align_images_in_dir(image_dir, result_dir, aff_ref=None, path_ref=None, recompute=True):
"""This function aligns all images in image_dir to a reference orientation (axes and directions).
This reference orientation can be directly provided as an affine matrix, or can be specified by a reference volume.
If neither are provided, the reference orientation is assumed to be an identity matrix.
:param image_dir: path of directory with images to align
:param result_dir: path of directory where flipped images will be writen
:param aff_ref: (optional) reference affine matrix. Can be a numpy array, or the path to such array.
:param path_ref: (optional) path of a volume to which all images will be aligned. Can also be the path to a folder
with as many images as in image_dir, in which case each image in image_dir is aligned to its counterpart in path_ref
(they are matched by sorting order).
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
path_images = utils.list_images_in_folder(image_dir)
# read reference affine matrix
if path_ref is not None:
assert aff_ref is None, 'cannot provide aff_ref and path_ref together.'
basename = os.path.basename(path_ref)
if ('.nii.gz' in basename) | ('.nii' in basename) | ('.mgz' in basename) | ('.npz' in basename):
_, aff_ref, _ = utils.load_volume(path_ref, im_only=False)
path_refs = [None] * len(path_images)
else:
path_refs = utils.list_images_in_folder(path_ref)
elif aff_ref is not None:
aff_ref = utils.load_array_if_path(aff_ref)
path_refs = [None] * len(path_images)
else:
aff_ref = np.eye(4)
path_refs = [None] * len(path_images)
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'aligning', True)
for idx, (path_image, path_ref) in enumerate(zip(path_images, path_refs)):
loop_info.update(idx)
# align image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
if path_ref is not None:
_, aff_ref, _ = utils.load_volume(path_ref, im_only=False)
im, aff = align_volume_to_ref(im, aff, aff_ref=aff_ref, return_aff=True)
utils.save_volume(im, aff, h, path_result)
def correct_nans_images_in_dir(image_dir, result_dir, recompute=True):
"""Correct NaNs in all images in a directory.
:param image_dir: path of directory with images to correct
:param result_dir: path of directory where corrected images will be writen
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# loop over images
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'correcting', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
# flip image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
im[np.isnan(im)] = 0
utils.save_volume(im, aff, h, path_result)
def blur_images_in_dir(image_dir, result_dir, sigma, mask_dir=None, gpu=False, recompute=True):
"""This function blurs all the images in image_dir with kernels of the specified std deviations.
:param image_dir: path of directory with images to blur
:param result_dir: path of directory where blurred images will be writen
:param sigma: standard deviation of the blurring gaussian kernels.
Can be a number (isotropic blurring), or a sequence with the same length as the number of dimensions of images.
:param mask_dir: (optional) path of directory with masks of the region to blur.
Images and masks are matched by sorting order.
:param gpu: (optional) whether to use a fast gpu model for blurring
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# list images and masks
path_images = utils.list_images_in_folder(image_dir)
if mask_dir is not None:
path_masks = utils.list_images_in_folder(mask_dir)
else:
path_masks = [None] * len(path_images)
# loop over images
previous_model_input_shape = None
model = None
loop_info = utils.LoopInfo(len(path_images), 10, 'blurring', True)
for idx, (path_image, path_mask) in enumerate(zip(path_images, path_masks)):
loop_info.update(idx)
# load image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, im_shape, aff, n_dims, _, h, _ = utils.get_volume_info(path_image, return_volume=True)
if path_mask is not None:
mask = utils.load_volume(path_mask)
assert mask.shape == im.shape, 'mask and image should have the same shape'
else:
mask = None
# blur image
if gpu:
if (im_shape != previous_model_input_shape) | (model is None):
previous_model_input_shape = im_shape
inputs = [KL.Input(shape=im_shape + [1])]
sigma = utils.reformat_to_list(sigma, length=n_dims)
if mask is None:
image = GaussianBlur(sigma=sigma)(inputs[0])
else:
inputs.append(KL.Input(shape=im_shape + [1], dtype='float32'))
image = GaussianBlur(sigma=sigma, use_mask=True)(inputs)
model = Model(inputs=inputs, outputs=image)
if mask is None:
im = np.squeeze(model.predict(utils.add_axis(im, axis=[0, -1])))
else:
im = np.squeeze(model.predict([utils.add_axis(im, [0, -1]), utils.add_axis(mask, [0, -1])]))
else:
im = blur_volume(im, sigma, mask=mask)
utils.save_volume(im, aff, h, path_result)
def create_mutlimodal_images(list_channel_dir, result_dir, recompute=True):
"""This function forms multimodal images by stacking channels located in different folders.
:param list_channel_dir: list of all directories, each containing the same channel for all images.
Channels are matched between folders by sorting order.
:param result_dir: path of directory where multimodal images will be writen
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
assert isinstance(list_channel_dir, (list, tuple)), 'list_channel_dir should be a list or a tuple'
# gather path of all images for all channels
list_channel_paths = [utils.list_images_in_folder(d) for d in list_channel_dir]
n_images = len(list_channel_paths[0])
n_channels = len(list_channel_dir)
for channel_paths in list_channel_paths:
if len(channel_paths) != n_images:
raise ValueError('all directories should have the same number of files')
# loop over images
loop_info = utils.LoopInfo(n_images, 10, 'processing', True)
for idx in range(n_images):
loop_info.update(idx)
# stack all channels and save multichannel image
path_result = os.path.join(result_dir, os.path.basename(list_channel_paths[0][idx]))
if (not os.path.isfile(path_result)) | recompute:
list_channels = list()
tmp_aff = None
tmp_h = None
for channel_idx in range(n_channels):
tmp_channel, tmp_aff, tmp_h = utils.load_volume(list_channel_paths[channel_idx][idx], im_only=False)
list_channels.append(tmp_channel)
im = np.stack(list_channels, axis=-1)
utils.save_volume(im, tmp_aff, tmp_h, path_result)
def convert_images_in_dir_to_nifty(image_dir, result_dir, aff=None, ref_aff_dir=None, recompute=True):
"""Converts all images in image_dir to nifty format.
:param image_dir: path of directory with images to convert
:param result_dir: path of directory where converted images will be writen
:param aff: (optional) affine matrix in homogeneous coordinates with which to write the images.
Can also be 'FS' to write images with FreeSurfer typical affine matrix.
:param ref_aff_dir: (optional) alternatively to providing a fixed aff, different affine matrices can be used for
each image in image_dir by matching them to corresponding volumes contained in ref_aff_dir.
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# list images
path_images = utils.list_images_in_folder(image_dir)
if ref_aff_dir is not None:
path_ref_images = utils.list_images_in_folder(ref_aff_dir)
else:
path_ref_images = [None] * len(path_images)
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'converting', True)
for idx, (path_image, path_ref) in enumerate(zip(path_images, path_ref_images)):
loop_info.update(idx)
# convert images to nifty format
path_result = os.path.join(result_dir, os.path.basename(utils.strip_extension(path_image))) + '.nii.gz'
if (not os.path.isfile(path_result)) | recompute:
if utils.get_image_extension(path_image) == 'nii.gz':
shutil.copy2(path_image, path_result)
else:
im, tmp_aff, h = utils.load_volume(path_image, im_only=False)
if aff is not None:
tmp_aff = aff
elif path_ref is not None:
_, tmp_aff, h = utils.load_volume(path_ref, im_only=False)
utils.save_volume(im, tmp_aff, h, path_result)
def mri_convert_images_in_dir(image_dir,
result_dir,
interpolation=None,
reference_dir=None,
same_reference=False,
voxsize=None,
path_freesurfer='/usr/local/freesurfer',
mri_convert_path='/usr/local/freesurfer/bin/mri_convert',
recompute=True):
"""This function launches mri_convert on all images contained in image_dir, and writes the results in result_dir.
The interpolation type can be specified (i.e. 'nearest'), as well as a folder containing references for resampling.
reference_dir can be the path of a single *image* if same_reference=True.
:param image_dir: path of directory with images to convert
:param result_dir: path of directory where converted images will be writen
:param interpolation: (optional) interpolation type, can be 'inter' (default), 'cubic', 'nearest', 'trilinear'
:param reference_dir: (optional) path of directory with reference images. References are matched to images by
sorting order. If same_reference is false, references and images are matched by sorting order.
This can also be the path to a single image that will be used as reference for all images im image_dir (set
same_reference to true in that case).
:param same_reference: (optional) whether to use a single image as reference for all images to interpolate.
:param voxsize: (optional) resolution at which to resample converted image. Must be a list of length n_dims.
:param path_freesurfer: (optional) path FreeSurfer home
:param mri_convert_path: (optional) path mri_convert binary file
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# set up FreeSurfer
os.environ['FREESURFER_HOME'] = path_freesurfer
os.system(os.path.join(path_freesurfer, 'SetUpFreeSurfer.sh'))
mri_convert = mri_convert_path + ' '
# list images
path_images = utils.list_images_in_folder(image_dir)
if reference_dir is not None:
if same_reference:
path_references = [reference_dir] * len(path_images)
else:
path_references = utils.list_images_in_folder(reference_dir)
assert len(path_references) == len(path_images), 'different number of files in image_dir and reference_dir'
else:
path_references = [None] * len(path_images)
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'converting', True)
for idx, (path_image, path_reference) in enumerate(zip(path_images, path_references)):
loop_info.update(idx)
# convert image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
cmd = mri_convert + path_image + ' ' + path_result + ' -odt float'
if interpolation is not None:
cmd += ' -rt ' + interpolation
if reference_dir is not None:
cmd += ' -rl ' + path_reference
if voxsize is not None:
voxsize = utils.reformat_to_list(voxsize, dtype='float')
cmd += ' --voxsize ' + ' '.join([str(np.around(v, 3)) for v in voxsize])
os.system(cmd)
def samseg_images_in_dir(image_dir,
result_dir,
atlas_dir=None,
threads=4,
path_freesurfer='/usr/local/freesurfer',
keep_segm_only=True,
recompute=True):
"""This function launches samseg for all images contained in image_dir and writes the results in result_dir.
If keep_segm_only=True, the result segmentation is copied in result_dir and SAMSEG's intermediate result dir is
deleted.
:param image_dir: path of directory with input images
:param result_dir: path of directory where processed images folders (if keep_segm_only is False),
or samseg segmentation (if keep_segm_only is True) will be writen
:param atlas_dir: (optional) path of samseg atlas directory. If None, use samseg default atlas.
:param threads: (optional) number of threads to use
:param path_freesurfer: (optional) path FreeSurfer home
:param keep_segm_only: (optional) whether to keep samseg result folders, or only samseg segmentations.
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# set up FreeSurfer
os.environ['FREESURFER_HOME'] = path_freesurfer
os.system(os.path.join(path_freesurfer, 'SetUpFreeSurfer.sh'))
path_samseg = os.path.join(path_freesurfer, 'bin', 'run_samseg')
# loop over images
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'processing', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
# build path_result
path_im_result_dir = os.path.join(result_dir, utils.strip_extension(os.path.basename(path_image)))
path_samseg_result = os.path.join(path_im_result_dir, 'seg.mgz')
if keep_segm_only:
path_result = os.path.join(result_dir, utils.strip_extension(os.path.basename(path_image)) + '_seg.mgz')
else:
path_result = path_samseg_result
# run samseg
if (not os.path.isfile(path_result)) | recompute:
cmd = utils.mkcmd(path_samseg, '-i', path_image, '-o', path_im_result_dir, '--threads', threads)
if atlas_dir is not None:
cmd = utils.mkcmd(cmd, '-a', atlas_dir)
os.system(cmd)
# move segmentation to result_dir if necessary
if keep_segm_only:
if os.path.isfile(path_samseg_result):
shutil.move(path_samseg_result, path_result)
if os.path.isdir(path_im_result_dir):
shutil.rmtree(path_im_result_dir)
def niftyreg_images_in_dir(image_dir,
reference_dir,
nifty_reg_function='reg_resample',
input_transformation_dir=None,
result_dir=None,
result_transformation_dir=None,
interpolation=None,
same_floating=False,
same_reference=False,
same_transformation=False,
path_nifty_reg='/home/benjamin/Softwares/niftyreg-gpu/build/reg-apps',
recompute=True):
"""This function launches one of niftyreg functions (reg_aladin, reg_f3d, reg_resample) on all images contained
in image_dir.
:param image_dir: path of directory with images to register. Can also be a single image, in that case set
same_floating to True.
:param reference_dir: path of directory with reference images. If same_reference is false, references and images are
matched by sorting order. This can also be the path to a single image that will be used as reference for all images
im image_dir (set same_reference to True in that case).
:param nifty_reg_function: (optional) name of the niftyreg function to use. Can be 'reg_aladin', 'reg_f3d', or
'reg_resample'. Default is 'reg_resample'.
:param input_transformation_dir: (optional) path of a directory containing all the input transformation (for
reg_resample, or reg_f3d). Can also be the path to a single transformation that will be used for all images
in image_dir (set same_transformation to True in that case).
:param result_dir: path of directory where output images will be writen.
:param result_transformation_dir: path of directory where resulting transformations will be writen (for
reg_aladin and reg_f3d).
:param interpolation: (optional) integer describing the order of the interpolation to apply (0 = nearest neighbours)
:param same_floating: (optional) set to true if only one image is used as floating image.
:param same_reference: (optional) whether to use a single image as reference for all input images.
:param same_transformation: (optional) whether to apply the same transformation to all floating images.
:param path_nifty_reg: (optional) path of the folder containing nifty-reg functions
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dirs
if result_dir is not None:
utils.mkdir(result_dir)
if result_transformation_dir is not None:
utils.mkdir(result_transformation_dir)
nifty_reg = os.path.join(path_nifty_reg, nifty_reg_function)
# list reference and floating images
path_images = utils.list_images_in_folder(image_dir)
path_references = utils.list_images_in_folder(reference_dir)
if same_reference:
path_references = utils.reformat_to_list(path_references, length=len(path_images))
if same_floating:
path_images = utils.reformat_to_list(path_images, length=len(path_references))
assert len(path_references) == len(path_images), 'different number of files in image_dir and reference_dir'
# list input transformations
if input_transformation_dir is not None:
if same_transformation:
path_input_transfs = utils.reformat_to_list(input_transformation_dir, length=len(path_images))
else:
path_input_transfs = utils.list_files(input_transformation_dir)
assert len(path_input_transfs) == len(path_images), 'different number of transformations and images'
else:
path_input_transfs = [None] * len(path_images)
# define flag input trans
if input_transformation_dir is not None:
if nifty_reg_function == 'reg_aladin':
flag_input_trans = '-inaff'
elif nifty_reg_function == 'reg_f3d':
flag_input_trans = '-aff'
elif nifty_reg_function == 'reg_resample':
flag_input_trans = '-trans'
else:
raise Exception('nifty_reg_function can only be "reg_aladin", "reg_f3d", or "reg_resample"')
else:
flag_input_trans = None
# define flag result transformation
if result_transformation_dir is not None:
if nifty_reg_function == 'reg_aladin':
flag_result_trans = '-aff'
elif nifty_reg_function == 'reg_f3d':
flag_result_trans = '-cpp'
else:
raise Exception('result_transformation_dir can only be used with "reg_aladin" or "reg_f3d"')
else:
flag_result_trans = None
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'processing', True)
for idx, (path_image, path_ref, path_input_trans) in enumerate(zip(path_images,
path_references,
path_input_transfs)):
loop_info.update(idx)
# define path registered image
name = os.path.basename(path_ref) if same_floating else os.path.basename(path_image)
if result_dir is not None:
path_result = os.path.join(result_dir, name)
result_already_computed = os.path.isfile(path_result)
else:
path_result = None
result_already_computed = True
# define path resulting transformation
if result_transformation_dir is not None:
if nifty_reg_function == 'reg_aladin':
path_result_trans = os.path.join(result_transformation_dir, utils.strip_extension(name) + '.txt')
result_trans_already_computed = os.path.isfile(path_result_trans)
else:
path_result_trans = os.path.join(result_transformation_dir, name)
result_trans_already_computed = os.path.isfile(path_result_trans)
else:
path_result_trans = None
result_trans_already_computed = True
if (not result_already_computed) | (not result_trans_already_computed) | recompute:
# build main command
cmd = utils.mkcmd(nifty_reg, '-ref', path_ref, '-flo', path_image, '-pad 0')
# add options
if path_result is not None:
cmd = utils.mkcmd(cmd, '-res', path_result)
if flag_input_trans is not None:
cmd = utils.mkcmd(cmd, flag_input_trans, path_input_trans)
if flag_result_trans is not None:
cmd = utils.mkcmd(cmd, flag_result_trans, path_result_trans)
if interpolation is not None:
cmd = utils.mkcmd(cmd, '-inter', interpolation)
# execute
os.system(cmd)
def upsample_anisotropic_images(image_dir,
resample_image_result_dir,
resample_like_dir,
path_freesurfer='/usr/local/freesurfer/',
recompute=True):
"""This function takes as input a set of LR images and resample them to HR with respect to reference images.
:param image_dir: path of directory with input images (only uni-modal images supported)
:param resample_image_result_dir: path of directory where resampled images will be writen
:param resample_like_dir: path of directory with reference images.
:param path_freesurfer: (optional) path freesurfer home, as this function uses mri_convert
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(resample_image_result_dir)
# set up FreeSurfer
os.environ['FREESURFER_HOME'] = path_freesurfer
os.system(os.path.join(path_freesurfer, 'SetUpFreeSurfer.sh'))
mri_convert = os.path.join(path_freesurfer, 'bin/mri_convert')
# list images and labels
path_images = utils.list_images_in_folder(image_dir)
path_ref_images = utils.list_images_in_folder(resample_like_dir)
assert len(path_images) == len(path_ref_images), \
'the folders containing the images and their references are not the same size'
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'upsampling', True)
for idx, (path_image, path_ref) in enumerate(zip(path_images, path_ref_images)):
loop_info.update(idx)
# upsample image
_, _, n_dims, _, _, image_res = utils.get_volume_info(path_image, return_volume=False)
path_im_upsampled = os.path.join(resample_image_result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_im_upsampled)) | recompute:
cmd = utils.mkcmd(mri_convert, path_image, path_im_upsampled, '-rl', path_ref, '-odt float')
os.system(cmd)
path_dist_map = os.path.join(resample_image_result_dir, 'dist_map_' + os.path.basename(path_image))
if (not os.path.isfile(path_dist_map)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
dist_map = np.meshgrid(*[np.arange(s) for s in im.shape], indexing='ij')
tmp_dir = utils.strip_extension(path_im_upsampled) + '_meshes'
utils.mkdir(tmp_dir)
path_meshes_up = list()
for (i, maps) in enumerate(dist_map):
path_mesh = os.path.join(tmp_dir, '%s_' % i + os.path.basename(path_image))
path_mesh_up = os.path.join(tmp_dir, 'up_%s_' % i + os.path.basename(path_image))
utils.save_volume(maps, aff, h, path_mesh)
cmd = utils.mkcmd(mri_convert, path_mesh, path_mesh_up, '-rl', path_im_upsampled, '-odt float')
os.system(cmd)
path_meshes_up.append(path_mesh_up)
mesh_up_0, aff, h = utils.load_volume(path_meshes_up[0], im_only=False)
mesh_up = np.stack([mesh_up_0] + [utils.load_volume(p) for p in path_meshes_up[1:]], -1)
shutil.rmtree(tmp_dir)
floor = np.floor(mesh_up)
ceil = np.ceil(mesh_up)
f_dist = mesh_up - floor
c_dist = ceil - mesh_up
dist = np.minimum(f_dist, c_dist) * utils.add_axis(image_res, axis=[0] * n_dims)
dist = np.sqrt(np.sum(dist ** 2, axis=-1))
utils.save_volume(dist, aff, h, path_dist_map)
def simulate_upsampled_anisotropic_images(image_dir,
downsample_image_result_dir,
resample_image_result_dir,
data_res,
labels_dir=None,
downsample_labels_result_dir=None,
slice_thickness=None,
build_dist_map=False,
path_freesurfer='/usr/local/freesurfer/',
gpu=True,
recompute=True):
"""This function takes as input a set of HR images and creates two datasets with it:
1) a set of LR images obtained by downsampling the HR images with nearest neighbour interpolation,
2) a set of HR images obtained by resampling the LR images to native HR with linear interpolation.
Additionally, this function can also create a set of LR labels from label maps corresponding to the input images.
:param image_dir: path of directory with input images (only uni-model images supported)
:param downsample_image_result_dir: path of directory where downsampled images will be writen
:param resample_image_result_dir: path of directory where resampled images will be writen
:param data_res: resolution of LR images. Can either be: an int, a float, a list or a numpy array.
:param labels_dir: (optional) path of directory with label maps corresponding to input images
:param downsample_labels_result_dir: (optional) path of directory where downsampled label maps will be writen
:param slice_thickness: (optional) thickness of slices to simulate. Can be a number, a list or a numpy array.
:param build_dist_map: (optional) whether to return the resampled images with an additional channel indicating the
distance of each voxel to the nearest acquired voxel. Default is False.
:param path_freesurfer: (optional) path freesurfer home, as this function uses mri_convert
:param gpu: (optional) whether to use a fast gpu model for blurring
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(resample_image_result_dir)
utils.mkdir(downsample_image_result_dir)
if labels_dir is not None:
assert downsample_labels_result_dir is not None, \
'downsample_labels_result_dir should not be None if labels_dir is specified'
utils.mkdir(downsample_labels_result_dir)
# set up FreeSurfer
os.environ['FREESURFER_HOME'] = path_freesurfer
os.system(os.path.join(path_freesurfer, 'SetUpFreeSurfer.sh'))
mri_convert = os.path.join(path_freesurfer, 'bin/mri_convert')
# list images and labels
path_images = utils.list_images_in_folder(image_dir)
path_labels = [None] * len(path_images) if labels_dir is None else utils.list_images_in_folder(labels_dir)
# initialisation
_, _, n_dims, _, _, image_res = utils.get_volume_info(path_images[0], return_volume=False, aff_ref=np.eye(4))
data_res = np.squeeze(utils.reformat_to_n_channels_array(data_res, n_dims, n_channels=1))
slice_thickness = utils.reformat_to_list(slice_thickness, length=n_dims)
# loop over images
previous_model_input_shape = None
model = None
loop_info = utils.LoopInfo(len(path_images), 10, 'processing', True)
for idx, (path_image, path_labels) in enumerate(zip(path_images, path_labels)):
loop_info.update(idx)
# downsample image
path_im_downsampled = os.path.join(downsample_image_result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_im_downsampled)) | recompute:
im, _, aff, n_dims, _, h, image_res = utils.get_volume_info(path_image, return_volume=True)
im, aff_aligned = align_volume_to_ref(im, aff, aff_ref=np.eye(4), return_aff=True, n_dims=n_dims)
im_shape = list(im.shape[:n_dims])
sigma = blurring_sigma_for_downsampling(image_res, data_res, thickness=slice_thickness)
sigma = [0 if data_res[i] == image_res[i] else sigma[i] for i in range(n_dims)]
# blur image
if gpu:
if (im_shape != previous_model_input_shape) | (model is None):
previous_model_input_shape = im_shape
image_in = KL.Input(shape=im_shape + [1])
image = GaussianBlur(sigma=sigma)(image_in)
model = Model(inputs=image_in, outputs=image)
im = np.squeeze(model.predict(utils.add_axis(im, axis=[0, -1])))
else:
im = blur_volume(im, sigma, mask=None)
utils.save_volume(im, aff_aligned, h, path_im_downsampled)
# downsample blurred image
voxsize = ' '.join([str(r) for r in data_res])
cmd = utils.mkcmd(mri_convert, path_im_downsampled, path_im_downsampled, '--voxsize', voxsize,
'-odt float -rt nearest')
os.system(cmd)
# downsample labels if necessary
if path_labels is not None:
path_lab_downsampled = os.path.join(downsample_labels_result_dir, os.path.basename(path_labels))
if (not os.path.isfile(path_lab_downsampled)) | recompute:
cmd = utils.mkcmd(mri_convert, path_labels, path_lab_downsampled, '-rl', path_im_downsampled,
'-odt float -rt nearest')
os.system(cmd)
# upsample image
path_im_upsampled = os.path.join(resample_image_result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_im_upsampled)) | recompute:
cmd = utils.mkcmd(mri_convert, path_im_downsampled, path_im_upsampled, '-rl', path_image, '-odt float')
os.system(cmd)
if build_dist_map:
path_dist_map = os.path.join(resample_image_result_dir, 'dist_map_' + os.path.basename(path_image))
if (not os.path.isfile(path_dist_map)) | recompute:
im, aff, h = utils.load_volume(path_im_downsampled, im_only=False)
dist_map = np.meshgrid(*[np.arange(s) for s in im.shape], indexing='ij')
tmp_dir = utils.strip_extension(path_im_downsampled) + '_meshes'
utils.mkdir(tmp_dir)
path_meshes_up = list()
for (i, d_map) in enumerate(dist_map):
path_mesh = os.path.join(tmp_dir, '%s_' % i + os.path.basename(path_image))
path_mesh_up = os.path.join(tmp_dir, 'up_%s_' % i + os.path.basename(path_image))
utils.save_volume(d_map, aff, h, path_mesh)
cmd = utils.mkcmd(mri_convert, path_mesh, path_mesh_up, '-rl', path_image, '-odt float')
os.system(cmd)
path_meshes_up.append(path_mesh_up)
mesh_up_0, aff, h = utils.load_volume(path_meshes_up[0], im_only=False)
mesh_up = np.stack([mesh_up_0] + [utils.load_volume(p) for p in path_meshes_up[1:]], -1)
shutil.rmtree(tmp_dir)
floor = np.floor(mesh_up)
ceil = np.ceil(mesh_up)
f_dist = mesh_up - floor
c_dist = ceil - mesh_up
dist = np.minimum(f_dist, c_dist) * utils.add_axis(data_res, axis=[0] * n_dims)
dist = np.sqrt(np.sum(dist ** 2, axis=-1))
utils.save_volume(dist, aff, h, path_dist_map)
def check_images_in_dir(image_dir, check_values=False, keep_unique=True, max_channels=10, verbose=True):
"""Check if all volumes within the same folder share the same characteristics: shape, affine matrix, resolution.
Also have option to check if all volumes have the same intensity values (useful for label maps).
:return four lists, each containing the different values detected for a specific parameter among those to check."""
# define information to check
list_shape = list()
list_aff = list()
list_res = list()
list_axes = list()
if check_values:
list_unique_values = list()
else:
list_unique_values = None
# loop through files
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'checking', verbose) if verbose else None
for idx, path_image in enumerate(path_images):
if loop_info is not None:
loop_info.update(idx)
# get info
im, shape, aff, n_dims, _, h, res = utils.get_volume_info(path_image, True, np.eye(4), max_channels)
axes = get_ras_axes(aff, n_dims=n_dims).tolist()
aff[:, np.arange(n_dims)] = aff[:, axes]
aff = (np.int32(np.round(np.array(aff[:3, :3]), 2) * 100) / 100).tolist()
res = (np.int32(np.round(np.array(res), 2) * 100) / 100).tolist()
# add values to list if not already there
if (shape not in list_shape) | (not keep_unique):
list_shape.append(shape)
if (aff not in list_aff) | (not keep_unique):
list_aff.append(aff)
if (res not in list_res) | (not keep_unique):
list_res.append(res)
if (axes not in list_axes) | (not keep_unique):
list_axes.append(axes)
if list_unique_values is not None:
uni = np.unique(im).tolist()
if (uni not in list_unique_values) | (not keep_unique):
list_unique_values.append(uni)
return list_shape, list_aff, list_res, list_axes, list_unique_values
# ----------------------------------------------- edit label maps in dir -----------------------------------------------
def correct_labels_in_dir(labels_dir, results_dir, incorrect_labels, correct_labels=None,
use_nearest_label=False, remove_zero=False, smooth=False, recompute=True):
"""This function corrects label values for all label maps in a folder with either
- a list a given values,
- or with the nearest label value.
:param labels_dir: path of directory with input label maps
:param results_dir: path of directory where corrected label maps will be writen
:param incorrect_labels: list of all label values to correct (e.g. [1, 2, 3, 4]).
:param correct_labels: (optional) list of correct label values to replace the incorrect ones.
Correct values must have the same order as their corresponding value in list_incorrect_labels.
When several correct values are possible for the same incorrect value, the nearest correct value will be selected at
each voxel to correct. In that case, the different correct values must be specified inside a list within
list_correct_labels (e.g. [10, 20, 30, [40, 50]).
:param use_nearest_label: (optional) whether to correct the incorrect label values with the nearest labels.
:param remove_zero: (optional) if use_nearest_label is True, set to True not to consider zero among the potential
candidates for the nearest neighbour.
:param smooth: (optional) whether to smooth the corrected label maps
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(results_dir)
# prepare data files
path_labels = utils.list_images_in_folder(labels_dir)
loop_info = utils.LoopInfo(len(path_labels), 10, 'correcting', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# correct labels
path_result = os.path.join(results_dir, os.path.basename(path_label))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_label, im_only=False, dtype='int32')
im = correct_label_map(im, incorrect_labels, correct_labels, use_nearest_label, remove_zero, smooth)
utils.save_volume(im, aff, h, path_result)
def mask_labels_in_dir(labels_dir, result_dir, values_to_keep, masking_value=0, mask_result_dir=None, recompute=True):
"""This function masks all label maps in a folder by keeping a set of given label values.
:param labels_dir: path of directory with input label maps
:param result_dir: path of directory where corrected label maps will be writen
:param values_to_keep: list of values for masking the label maps.
:param masking_value: (optional) value to mask the label maps with
:param mask_result_dir: (optional) path of directory where applied masks will be writen
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
if mask_result_dir is not None:
utils.mkdir(mask_result_dir)
# reformat values to keep
values_to_keep = utils.reformat_to_list(values_to_keep, load_as_numpy=True)
# loop over labels
path_labels = utils.list_images_in_folder(labels_dir)
loop_info = utils.LoopInfo(len(path_labels), 10, 'masking', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# mask labels
path_result = os.path.join(result_dir, os.path.basename(path_label))
if mask_result_dir is not None:
path_result_mask = os.path.join(mask_result_dir, os.path.basename(path_label))
else:
path_result_mask = ''
if (not os.path.isfile(path_result)) | \
(mask_result_dir is not None) & (not os.path.isfile(path_result_mask)) | \
recompute:
lab, aff, h = utils.load_volume(path_label, im_only=False)
if mask_result_dir is not None:
labels, mask = mask_label_map(lab, values_to_keep, masking_value, return_mask=True)
path_result_mask = os.path.join(mask_result_dir, os.path.basename(path_label))
utils.save_volume(mask, aff, h, path_result_mask)
else:
labels = mask_label_map(lab, values_to_keep, masking_value, return_mask=False)
utils.save_volume(labels, aff, h, path_result)
def smooth_labels_in_dir(labels_dir, result_dir, gpu=False, labels_list=None, connectivity=1, recompute=True):
"""Smooth all label maps in a folder by replacing each voxel by the value of its most numerous neighbours.
:param labels_dir: path of directory with input label maps
:param result_dir: path of directory where smoothed label maps will be writen
:param gpu: (optional) whether to use a gpu implementation for faster processing
:param labels_list: (optional) if gpu is True, path of numpy array with all label values.
Automatically computed if not provided.
:param connectivity: (optional) connectivity to use when smoothing the label maps
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# list label maps
path_labels = utils.list_images_in_folder(labels_dir)
if labels_list is not None:
labels_list, _ = utils.get_list_labels(label_list=labels_list, FS_sort=True)
if gpu:
# initialisation
previous_model_input_shape = None
smoothing_model = None
# loop over label maps
loop_info = utils.LoopInfo(len(path_labels), 10, 'smoothing', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# smooth label map
path_result = os.path.join(result_dir, os.path.basename(path_label))
if (not os.path.isfile(path_result)) | recompute:
labels, label_shape, aff, n_dims, _, h, _ = utils.get_volume_info(path_label, return_volume=True)
if label_shape != previous_model_input_shape:
previous_model_input_shape = label_shape
smoothing_model = smoothing_gpu_model(label_shape, labels_list, connectivity)
unique_labels = np.unique(labels).astype('int32')
if labels_list is None:
smoothed_labels = smoothing_model.predict(utils.add_axis(labels))
else:
labels_to_keep = [lab for lab in unique_labels if lab not in labels_list]
new_labels, mask_new_labels = mask_label_map(labels, labels_to_keep, return_mask=True)
smoothed_labels = np.squeeze(smoothing_model.predict(utils.add_axis(labels)))
smoothed_labels = np.where(mask_new_labels, new_labels, smoothed_labels)
mask_new_zeros = (labels > 0) & (smoothed_labels == 0)
smoothed_labels[mask_new_zeros] = labels[mask_new_zeros]
utils.save_volume(smoothed_labels, aff, h, path_result, dtype='int32')
else:
# build kernel
_, _, n_dims, _, _, _ = utils.get_volume_info(path_labels[0])
kernel = utils.build_binary_structure(connectivity, n_dims, shape=n_dims)
# loop over label maps
loop_info = utils.LoopInfo(len(path_labels), 10, 'smoothing', True)
for idx, path in enumerate(path_labels):
loop_info.update(idx)
# smooth label map
path_result = os.path.join(result_dir, os.path.basename(path))
if (not os.path.isfile(path_result)) | recompute:
volume, aff, h = utils.load_volume(path, im_only=False)
new_volume = smooth_label_map(volume, kernel, labels_list)
utils.save_volume(new_volume, aff, h, path_result, dtype='int32')
def smoothing_gpu_model(label_shape, label_list, connectivity=1):
"""This function builds a gpu model in keras with a tensorflow backend to smooth label maps.
This model replaces each voxel of the input by the value of its most numerous neighbour.
:param label_shape: shape of the label map
:param label_list: list of all labels to consider
:param connectivity: (optional) connectivity to use when smoothing the label maps
:return: gpu smoothing model
"""
# convert labels so values are in [0, ..., N-1] and use one hot encoding
n_labels = label_list.shape[0]
labels_in = KL.Input(shape=label_shape, name='lab_input', dtype='int32')
labels = ConvertLabels(label_list)(labels_in)
labels = KL.Lambda(lambda x: tf.one_hot(tf.cast(x, dtype='int32'), depth=n_labels, axis=-1))(labels)
# count neighbouring voxels
n_dims, _ = utils.get_dims(label_shape)
k = utils.add_axis(utils.build_binary_structure(connectivity, n_dims, shape=n_dims), axis=[-1, -1])
kernel = KL.Lambda(lambda x: tf.convert_to_tensor(k, dtype='float32'))([])
split = KL.Lambda(lambda x: tf.split(x, [1] * n_labels, axis=-1))(labels)
labels = KL.Lambda(lambda x: tf.nn.convolution(x[0], x[1], padding='SAME'))([split[0], kernel])
for i in range(1, n_labels):
tmp = KL.Lambda(lambda x: tf.nn.convolution(x[0], x[1], padding='SAME'))([split[i], kernel])
labels = KL.Lambda(lambda x: tf.concat([x[0], x[1]], -1))([labels, tmp])
# take the argmax and convert labels to original values
labels = KL.Lambda(lambda x: tf.math.argmax(x, -1))(labels)
labels = ConvertLabels(np.arange(n_labels), label_list)(labels)
return Model(inputs=labels_in, outputs=labels)
def erode_labels_in_dir(labels_dir, result_dir, labels_to_erode, erosion_factors=1., gpu=False, recompute=True):
"""Erode a given set of label values for all label maps in a folder.
:param labels_dir: path of directory with input label maps
:param result_dir: path of directory where cropped label maps will be writen
:param labels_to_erode: list of label values to erode
:param erosion_factors: (optional) list of erosion factors to use for each label value. If values are integers,
normal erosion applies. If float, we first 1) blur a mask of the corresponding label value with a gpu model,
and 2) use the erosion factor as a threshold in the blurred mask.
If erosion_factors is a single value, the same factor will be applied to all labels.
:param gpu: (optional) whether to use a fast gpu model for blurring (if erosion factors are floats)
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# loop over label maps
model = None
path_labels = utils.list_images_in_folder(labels_dir)
loop_info = utils.LoopInfo(len(path_labels), 5, 'eroding', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# erode label map
labels, aff, h = utils.load_volume(path_label, im_only=False)
path_result = os.path.join(result_dir, os.path.basename(path_label))
if (not os.path.isfile(path_result)) | recompute:
labels, model = erode_label_map(labels, labels_to_erode, erosion_factors, gpu, model, return_model=True)
utils.save_volume(labels, aff, h, path_result)
def upsample_labels_in_dir(labels_dir,
target_res,
result_dir,
path_label_list=None,
path_freesurfer='/usr/local/freesurfer/',
recompute=True):
"""This function upsamples all label maps within a folder. Importantly, each label map is converted into probability
maps for all label values, and all these maps are upsampled separately. The upsampled label maps are recovered by
taking the argmax of the label values probability maps.
:param labels_dir: path of directory with label maps to upsample
:param target_res: resolution at which to upsample the label maps. can be a single number (isotropic), or a list.
:param result_dir: path of directory where the upsampled label maps will be writen
:param path_label_list: (optional) path of numpy array containing all label values.
Computed automatically if not given.
:param path_freesurfer: (optional) path freesurfer home (upsampling performed with mri_convert)
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# prepare result dir
utils.mkdir(result_dir)
# set up FreeSurfer
os.environ['FREESURFER_HOME'] = path_freesurfer
os.system(os.path.join(path_freesurfer, 'SetUpFreeSurfer.sh'))
mri_convert = os.path.join(path_freesurfer, 'bin/mri_convert')
# list label maps
path_labels = utils.list_images_in_folder(labels_dir)
labels_shape, aff, n_dims, _, h, _ = utils.get_volume_info(path_labels[0], max_channels=3)
# build command
target_res = utils.reformat_to_list(target_res, length=n_dims)
post_cmd = '-voxsize ' + ' '.join([str(r) for r in target_res]) + ' -odt float'
# load label list and corresponding LUT to make sure that labels go from 0 to N-1
label_list, _ = utils.get_list_labels(path_label_list, labels_dir=path_labels, FS_sort=False)
new_label_list = np.arange(len(label_list), dtype='int32')
lut = utils.get_mapping_lut(label_list)
# loop over label maps
loop_info = utils.LoopInfo(len(path_labels), 5, 'upsampling', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
path_result = os.path.join(result_dir, os.path.basename(path_label))
if (not os.path.isfile(path_result)) | recompute:
# load volume
labels, aff, h = utils.load_volume(path_label, im_only=False)
labels = lut[labels.astype('int')]
# create individual folders for label map
basefilename = utils.strip_extension(os.path.basename(path_label))
indiv_label_dir = os.path.join(result_dir, basefilename)
upsample_indiv_label_dir = os.path.join(result_dir, basefilename + '_upsampled')
utils.mkdir(indiv_label_dir)
utils.mkdir(upsample_indiv_label_dir)
# loop over label values
for label in new_label_list:
path_mask = os.path.join(indiv_label_dir, str(label) + '.nii.gz')
path_mask_upsampled = os.path.join(upsample_indiv_label_dir, str(label) + '.nii.gz')
if not os.path.isfile(path_mask):
mask = (labels == label) * 1.0
utils.save_volume(mask, aff, h, path_mask)
if not os.path.isfile(path_mask_upsampled):
cmd = utils.mkcmd(mri_convert, path_mask, path_mask_upsampled, post_cmd)
os.system(cmd)
# compute argmax of upsampled probability maps (upload them one at a time)
probmax, aff, h = utils.load_volume(os.path.join(upsample_indiv_label_dir, '0.nii.gz'), im_only=False)
labels = np.zeros(probmax.shape, dtype='int')
for label in new_label_list:
prob = utils.load_volume(os.path.join(upsample_indiv_label_dir, str(label) + '.nii.gz'))
idx = prob > probmax
labels[idx] = label
probmax[idx] = prob[idx]
utils.save_volume(label_list[labels], aff, h, path_result, dtype='int32')
def compute_hard_volumes_in_dir(labels_dir,
voxel_volume=None,
path_label_list=None,
skip_background=True,
path_numpy_result=None,
path_csv_result=None,
FS_sort=False):
"""Compute hard volumes of structures for all label maps in a folder.
:param labels_dir: path of directory with input label maps
:param voxel_volume: (optional) volume of the voxels. If None, it will be directly inferred from the file header.
Set to 1 for a voxel count.
:param path_label_list: (optional) list of labels to compute volumes for.
Can be an int, a sequence, or a numpy array. If None, the volumes of all label values are computed for each subject.
:param skip_background: (optional) whether to skip computing the volume of the background.
If label_list is None, this assumes background value is 0.
If label_list is not None, this assumes the background is the first value in label list.
:param path_numpy_result: (optional) path where to write the result volumes as a numpy array.
:param path_csv_result: (optional) path where to write the results as csv file.
:param FS_sort: (optional) whether to sort the labels in FreeSurfer order.
:return: numpy array with the volume of each structure for all subjects.
Rows represent label values, and columns represent subjects.
"""
# create result directories
if path_numpy_result is not None:
utils.mkdir(os.path.dirname(path_numpy_result))
if path_csv_result is not None:
utils.mkdir(os.path.dirname(path_csv_result))
# load or compute labels list
label_list, _ = utils.get_list_labels(path_label_list, labels_dir, FS_sort=FS_sort)
# create csv volume file if necessary
if path_csv_result is not None:
if skip_background:
cvs_header = [['subject'] + [str(lab) for lab in label_list[1:]]]
else:
cvs_header = [['subject'] + [str(lab) for lab in label_list]]
with open(path_csv_result, 'w') as csvFile:
writer = csv.writer(csvFile)
writer.writerows(cvs_header)
csvFile.close()
# loop over label maps
path_labels = utils.list_images_in_folder(labels_dir)
if skip_background:
volumes = np.zeros((label_list.shape[0] - 1, len(path_labels)))
else:
volumes = np.zeros((label_list.shape[0], len(path_labels)))
loop_info = utils.LoopInfo(len(path_labels), 10, 'processing', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# load segmentation, and compute unique labels
labels, _, _, _, _, _, subject_res = utils.get_volume_info(path_label, return_volume=True)
if voxel_volume is None:
voxel_volume = float(np.prod(subject_res))
subject_volumes = compute_hard_volumes(labels, voxel_volume, label_list, skip_background)
volumes[:, idx] = subject_volumes
# write volumes
if path_csv_result is not None:
subject_volumes = np.around(volumes[:, idx], 3)
row = [utils.strip_suffix(os.path.basename(path_label))] + [str(vol) for vol in subject_volumes]
with open(path_csv_result, 'a') as csvFile:
writer = csv.writer(csvFile)
writer.writerow(row)
csvFile.close()
# write numpy array if necessary
if path_numpy_result is not None:
np.save(path_numpy_result, volumes)
return volumes
def build_atlas(labels_dir,
label_list,
align_centre_of_mass=False,
margin=15,
shape=None,
path_atlas=None):
"""This function builds a binary atlas (defined by label values > 0) from several label maps.
:param labels_dir: path of directory with input label maps
:param label_list: list of all labels in the label maps. If there is more than 1 value here, the different channels
of the atlas (each corresponding to the probability map of a given label) will in the same order as in this list.
:param align_centre_of_mass: whether to build the atlas by aligning the center of mass of each label map.
If False, the atlas has the same size as the input label maps, which are assumed to be aligned.
:param margin: (optional) If align_centre_of_mass is True, margin by which to crop the input label maps around
their center of mass. Therefore it controls the size of the output atlas: (2*margin + 1)**n_dims.
:param shape: shape of the output atlas.
:param path_atlas: (optional) path where the output atlas will be writen.
Default is None, where the atlas is not saved."""
# list of all label maps and create result dir
path_labels = utils.list_images_in_folder(labels_dir)
n_label_maps = len(path_labels)
utils.mkdir(os.path.dirname(path_atlas))
# read list labels and create lut
label_list = np.array(utils.reformat_to_list(label_list, load_as_numpy=True, dtype='int'))
lut = utils.get_mapping_lut(label_list)
n_labels = len(label_list)
# create empty atlas
im_shape, aff, n_dims, _, h, _ = utils.get_volume_info(path_labels[0], aff_ref=np.eye(4))
if align_centre_of_mass:
shape = [margin * 2] * n_dims
else:
shape = utils.reformat_to_list(shape, length=n_dims) if shape is not None else im_shape
shape = shape + [n_labels] if n_labels > 1 else shape
atlas = np.zeros(shape)
# loop over label maps
loop_info = utils.LoopInfo(n_label_maps, 10, 'processing', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# load label map and build mask
lab = utils.load_volume(path_label, dtype='int32', aff_ref=np.eye(4))
lab = correct_label_map(lab, [31, 63, 72], [4, 43, 0])
lab = lut[lab.astype('int')]
lab = pad_volume(lab, shape[:n_dims])
lab = crop_volume(lab, cropping_shape=shape[:n_dims])
indices = np.where(lab > 0)
if len(label_list) > 1:
lab = np.identity(n_labels)[lab]
# crop label map around centre of mass
if align_centre_of_mass:
centre_of_mass = np.array([np.mean(indices[0]), np.mean(indices[1]), np.mean(indices[2])], dtype='int32')
min_crop = centre_of_mass - margin
max_crop = centre_of_mass + margin
atlas += lab[min_crop[0]:max_crop[0], min_crop[1]:max_crop[1], min_crop[2]:max_crop[2], ...]
# otherwise just add the one-hot labels
else:
atlas += lab
# normalise atlas and save it if necessary
atlas /= n_label_maps
atlas = align_volume_to_ref(atlas, np.eye(4), aff_ref=aff, n_dims=n_dims)
if path_atlas is not None:
utils.save_volume(atlas, aff, h, path_atlas)
return atlas
# ---------------------------------------------------- edit dataset ----------------------------------------------------
def check_images_and_labels(image_dir, labels_dir, verbose=True):
"""Check if corresponding images and labels have the same affine matrices and shapes.
Labels are matched to images by sorting order.
:param image_dir: path of directory with input images
:param labels_dir: path of directory with corresponding label maps
:param verbose: whether to print out info
"""
# list images and labels
path_images = utils.list_images_in_folder(image_dir)
path_labels = utils.list_images_in_folder(labels_dir)
assert len(path_images) == len(path_labels), 'different number of files in image_dir and labels_dir'
# loop over images and labels
loop_info = utils.LoopInfo(len(path_images), 10, 'checking', verbose) if verbose else None
for idx, (path_image, path_label) in enumerate(zip(path_images, path_labels)):
if loop_info is not None:
loop_info.update(idx)
# load images and labels
im, aff_im, h_im = utils.load_volume(path_image, im_only=False)
lab, aff_lab, h_lab = utils.load_volume(path_label, im_only=False)
aff_im_list = np.round(aff_im, 2).tolist()
aff_lab_list = np.round(aff_lab, 2).tolist()
# check matching affine and shape
if aff_lab_list != aff_im_list:
print('aff mismatch :\n' + path_image)
print(aff_im_list)
print(path_label)
print(aff_lab_list)
print('')
if lab.shape != im.shape:
print('shape mismatch :\n' + path_image)
print(im.shape)
print('\n' + path_label)
print(lab.shape)
print('')
def crop_dataset_to_minimum_size(labels_dir, result_dir, image_dir=None, image_result_dir=None, margin=5):
"""Crop all label maps in a directory to the minimum possible common size, with a margin.
This is achieved by cropping each label map individually to the minimum size, and by padding all the cropped maps to
the same size (taken to be the maximum size of the cropped maps).
If images are provided, they undergo the same transformations as their corresponding label maps.
:param labels_dir: path of directory with input label maps
:param result_dir: path of directory where cropped label maps will be writen
:param image_dir: (optional) if not None, the cropping will be applied to all images in this directory
:param image_result_dir: (optional) path of directory where cropped images will be writen
:param margin: (optional) margin to apply around the label maps during cropping
"""
# create result dir
utils.mkdir(result_dir)
if image_dir is not None:
assert image_result_dir is not None, 'image_result_dir should not be None if image_dir is specified'
utils.mkdir(image_result_dir)
# list labels and images
path_labels = utils.list_images_in_folder(labels_dir)
if image_dir is not None:
path_images = utils.list_images_in_folder(image_dir)
else:
path_images = [None] * len(path_labels)
_, _, n_dims, _, _, _ = utils.get_volume_info(path_labels[0])
# loop over label maps for cropping
print('\ncropping labels to individual minimum size')
maximum_size = np.zeros(n_dims)
loop_info = utils.LoopInfo(len(path_labels), 10, 'cropping', True)
for idx, (path_label, path_image) in enumerate(zip(path_labels, path_images)):
loop_info.update(idx)
# crop label maps and update maximum size of cropped map
label, aff, h = utils.load_volume(path_label, im_only=False)
label, cropping, aff = crop_volume_around_region(label, aff=aff)
utils.save_volume(label, aff, h, os.path.join(result_dir, os.path.basename(path_label)))
maximum_size = np.maximum(maximum_size, np.array(label.shape) + margin * 2) # *2 to add margin on each side
# crop images if required
if path_image is not None:
image, aff_im, h_im = utils.load_volume(path_image, im_only=False)
image, aff_im = crop_volume_with_idx(image, cropping, aff=aff_im)
utils.save_volume(image, aff_im, h_im, os.path.join(image_result_dir, os.path.basename(path_image)))
# loop over label maps for padding
print('\npadding labels to same size')
loop_info = utils.LoopInfo(len(path_labels), 10, 'padding', True)
for idx, (path_label, path_image) in enumerate(zip(path_labels, path_images)):
loop_info.update(idx)
# pad label maps to maximum size
path_result = os.path.join(result_dir, os.path.basename(path_label))
label, aff, h = utils.load_volume(path_result, im_only=False)
label, aff = pad_volume(label, maximum_size, aff=aff)
utils.save_volume(label, aff, h, path_result)
# crop images if required
if path_image is not None:
path_result = os.path.join(image_result_dir, os.path.basename(path_image))
image, aff, h = utils.load_volume(path_result, im_only=False)
image, aff = pad_volume(image, maximum_size, aff=aff)
utils.save_volume(image, aff, h, path_result)
def crop_dataset_around_region_of_same_size(labels_dir,
result_dir,
image_dir=None,
image_result_dir=None,
margin=0,
recompute=True):
# create result dir
utils.mkdir(result_dir)
if image_dir is not None:
assert image_result_dir is not None, 'image_result_dir should not be None if image_dir is specified'
utils.mkdir(image_result_dir)
# list labels and images
path_labels = utils.list_images_in_folder(labels_dir)
path_images = utils.list_images_in_folder(image_dir) if image_dir is not None else [None] * len(path_labels)
_, _, n_dims, _, _, _ = utils.get_volume_info(path_labels[0])
recompute_labels = any([not os.path.isfile(os.path.join(result_dir, os.path.basename(path)))
for path in path_labels])
if (image_dir is not None) & (not recompute_labels):
recompute_labels = any([not os.path.isfile(os.path.join(image_result_dir, os.path.basename(path)))
for path in path_images])
# get minimum patch shape so that no labels are left out when doing the cropping later on
max_crop_shape = np.zeros(n_dims)
if recompute_labels:
for path_label in path_labels:
label, aff, _ = utils.load_volume(path_label, im_only=False)
label = align_volume_to_ref(label, aff, aff_ref=np.eye(4))
label = get_largest_connected_component(label > 0, structure=np.ones((3, 3, 3)))
_, cropping = crop_volume_around_region(label)
max_crop_shape = np.maximum(cropping[n_dims:] - cropping[:n_dims], max_crop_shape)
max_crop_shape += np.array(utils.reformat_to_list(margin, length=n_dims, dtype='int'))
print('max_crop_shape: ', max_crop_shape)
# crop shapes (possibly with padding if images are smaller than crop shape)
for path_label, path_image in zip(path_labels, path_images):
path_label_result = os.path.join(result_dir, os.path.basename(path_label))
path_image_result = os.path.join(image_result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_image_result)) | (not os.path.isfile(path_label_result)) | recompute:
# load labels
label, aff, h_la = utils.load_volume(path_label, im_only=False, dtype='int32')
label, aff_new = align_volume_to_ref(label, aff, aff_ref=np.eye(4), return_aff=True)
vol_shape = np.array(label.shape[:n_dims])
if path_image is not None:
image, _, h_im = utils.load_volume(path_image, im_only=False)
image = align_volume_to_ref(image, aff, aff_ref=np.eye(4))
else:
image = h_im = None
# mask labels
mask = get_largest_connected_component(label > 0, structure=np.ones((3, 3, 3)))
label[np.logical_not(mask)] = 0
# find cropping indices
indices = np.nonzero(mask)
min_idx = np.maximum(np.array([np.min(idx) for idx in indices]) - margin, 0)
max_idx = np.minimum(np.array([np.max(idx) for idx in indices]) + 1 + margin, vol_shape)
# expand/retract (depending on the desired shape) the cropping region around the centre
intermediate_vol_shape = max_idx - min_idx
min_idx = min_idx - np.int32(np.ceil((max_crop_shape - intermediate_vol_shape) / 2))
max_idx = max_idx + np.int32(np.floor((max_crop_shape - intermediate_vol_shape) / 2))
# check if we need to pad the output to the desired shape
min_padding = np.abs(np.minimum(min_idx, 0))
max_padding = np.maximum(max_idx - vol_shape, 0)
if np.any(min_padding > 0) | np.any(max_padding > 0):
pad_margins = tuple([(min_padding[i], max_padding[i]) for i in range(n_dims)])
else:
pad_margins = None
cropping = np.concatenate([np.maximum(min_idx, 0), np.minimum(max_idx, vol_shape)])
# crop volume
label = crop_volume_with_idx(label, cropping, n_dims=n_dims)
if path_image is not None:
image = crop_volume_with_idx(image, cropping, n_dims=n_dims)
# pad volume if necessary
if pad_margins is not None:
label = np.pad(label, pad_margins, mode='constant', constant_values=0)
if path_image is not None:
_, n_channels = utils.get_dims(image.shape)
pad_margins = tuple(list(pad_margins) + [(0, 0)]) if n_channels > 1 else pad_margins
image = np.pad(image, pad_margins, mode='constant', constant_values=0)
# update aff
if n_dims == 2:
min_idx = np.append(min_idx, 0)
aff_new[0:3, -1] = aff_new[0:3, -1] + aff_new[:3, :3] @ min_idx
# write labels
label, aff_final = align_volume_to_ref(label, aff_new, aff_ref=aff, return_aff=True)
utils.save_volume(label, aff_final, h_la, path_label_result, dtype='int32')
if path_image is not None:
image = align_volume_to_ref(image, aff_new, aff_ref=aff)
utils.save_volume(image, aff_final, h_im, path_image_result)
def crop_dataset_around_region(image_dir, labels_dir, image_result_dir, labels_result_dir, margin=0,
cropping_shape_div_by=None, recompute=True):
# create result dir
utils.mkdir(image_result_dir)
utils.mkdir(labels_result_dir)
# list volumes and masks
path_images = utils.list_images_in_folder(image_dir)
path_labels = utils.list_images_in_folder(labels_dir)
_, _, n_dims, n_channels, _, _ = utils.get_volume_info(path_labels[0])
# loop over images and labels
loop_info = utils.LoopInfo(len(path_images), 10, 'cropping', True)
for idx, (path_image, path_label) in enumerate(zip(path_images, path_labels)):
loop_info.update(idx)
path_label_result = os.path.join(labels_result_dir, os.path.basename(path_label))
path_image_result = os.path.join(image_result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_label_result)) | (not os.path.isfile(path_image_result)) | recompute:
image, aff, h_im = utils.load_volume(path_image, im_only=False)
label, _, h_lab = utils.load_volume(path_label, im_only=False)
mask = get_largest_connected_component(label > 0, structure=np.ones((3, 3, 3)))
label[np.logical_not(mask)] = 0
vol_shape = np.array(label.shape[:n_dims])
# find cropping indices
indices = np.nonzero(mask)
min_idx = np.maximum(np.array([np.min(idx) for idx in indices]) - margin, 0)
max_idx = np.minimum(np.array([np.max(idx) for idx in indices]) + 1 + margin, vol_shape)
# expand/retract (depending on the desired shape) the cropping region around the centre
intermediate_vol_shape = max_idx - min_idx
cropping_shape = np.array([utils.find_closest_number_divisible_by_m(s, cropping_shape_div_by,
answer_type='higher')
for s in intermediate_vol_shape])
min_idx = min_idx - np.int32(np.ceil((cropping_shape - intermediate_vol_shape) / 2))
max_idx = max_idx + np.int32(np.floor((cropping_shape - intermediate_vol_shape) / 2))
# check if we need to pad the output to the desired shape
min_padding = np.abs(np.minimum(min_idx, 0))
max_padding = np.maximum(max_idx - vol_shape, 0)
if np.any(min_padding > 0) | np.any(max_padding > 0):
pad_margins = tuple([(min_padding[i], max_padding[i]) for i in range(n_dims)])
else:
pad_margins = None
cropping = np.concatenate([np.maximum(min_idx, 0), np.minimum(max_idx, vol_shape)])
# crop volume
label = crop_volume_with_idx(label, cropping, n_dims=n_dims)
image = crop_volume_with_idx(image, cropping, n_dims=n_dims)
# pad volume if necessary
if pad_margins is not None:
label = np.pad(label, pad_margins, mode='constant', constant_values=0)
pad_margins = tuple(list(pad_margins) + [(0, 0)]) if n_channels > 1 else pad_margins
image = np.pad(image, pad_margins, mode='constant', constant_values=0)
# update aff
if n_dims == 2:
min_idx = np.append(min_idx, 0)
aff[0:3, -1] = aff[0:3, -1] + aff[:3, :3] @ min_idx
# write results
utils.save_volume(image, aff, h_im, path_image_result)
utils.save_volume(label, aff, h_lab, path_label_result, dtype='int32')
def subdivide_dataset_to_patches(patch_shape,
image_dir=None,
image_result_dir=None,
labels_dir=None,
labels_result_dir=None,
full_background=True,
remove_after_dividing=False):
"""This function subdivides images and/or label maps into several smaller patches of specified shape.
:param patch_shape: shape of patches to create. Can either be an int, a sequence, or a 1d numpy array.
:param image_dir: (optional) path of directory with input images
:param image_result_dir: (optional) path of directory where image patches will be writen
:param labels_dir: (optional) path of directory with input label maps
:param labels_result_dir: (optional) path of directory where label map patches will be writen
:param full_background: (optional) whether to keep patches only labelled as background (only if label maps are
provided).
:param remove_after_dividing: (optional) whether to delete input images after having divided them in smaller
patches. This enables to save disk space in the subdivision process.
"""
# create result dir and list images and label maps
assert (image_dir is not None) | (labels_dir is not None), \
'at least one of image_dir or labels_dir should not be None.'
if image_dir is not None:
assert image_result_dir is not None, 'image_result_dir should not be None if image_dir is specified'
utils.mkdir(image_result_dir)
path_images = utils.list_images_in_folder(image_dir)
else:
path_images = None
if labels_dir is not None:
assert labels_result_dir is not None, 'labels_result_dir should not be None if labels_dir is specified'
utils.mkdir(labels_result_dir)
path_labels = utils.list_images_in_folder(labels_dir)
else:
path_labels = None
if path_images is None:
path_images = [None] * len(path_labels)
if path_labels is None:
path_labels = [None] * len(path_images)
# reformat path_shape
patch_shape = utils.reformat_to_list(patch_shape)
n_dims, _ = utils.get_dims(patch_shape)
# loop over images and labels
loop_info = utils.LoopInfo(len(path_images), 10, 'processing', True)
for idx, (path_image, path_label) in enumerate(zip(path_images, path_labels)):
loop_info.update(idx)
# load image and labels
if path_image is not None:
im, aff_im, h_im = utils.load_volume(path_image, im_only=False, squeeze=False)
else:
im = aff_im = h_im = None
if path_label is not None:
lab, aff_lab, h_lab = utils.load_volume(path_label, im_only=False, squeeze=True)
else:
lab = aff_lab = h_lab = None
# get volume shape
if path_image is not None:
shape = im.shape
else:
shape = lab.shape
# crop image and label map to size divisible by patch_shape
new_size = np.array([utils.find_closest_number_divisible_by_m(shape[i], patch_shape[i]) for i in range(n_dims)])
crop = np.round((np.array(shape[:n_dims]) - new_size) / 2).astype('int')
crop = np.concatenate((crop, crop + new_size), axis=0)
if (im is not None) & (n_dims == 2):
im = im[crop[0]:crop[2], crop[1]:crop[3], ...]
elif (im is not None) & (n_dims == 3):
im = im[crop[0]:crop[3], crop[1]:crop[4], crop[2]:crop[5], ...]
if (lab is not None) & (n_dims == 2):
lab = lab[crop[0]:crop[2], crop[1]:crop[3], ...]
elif (lab is not None) & (n_dims == 3):
lab = lab[crop[0]:crop[3], crop[1]:crop[4], crop[2]:crop[5], ...]
# loop over patches
n_im = 0
n_crop = (new_size / patch_shape).astype('int')
for i in range(n_crop[0]):
i *= patch_shape[0]
for j in range(n_crop[1]):
j *= patch_shape[1]
if n_dims == 2:
# crop volumes
if lab is not None:
temp_la = lab[i:i + patch_shape[0], j:j + patch_shape[1], ...]
else:
temp_la = None
if im is not None:
temp_im = im[i:i + patch_shape[0], j:j + patch_shape[1], ...]
else:
temp_im = None
# write patches
if temp_la is not None:
if full_background | (not (temp_la == 0).all()):
n_im += 1
utils.save_volume(temp_la, aff_lab, h_lab, os.path.join(labels_result_dir,
os.path.basename(path_label.replace('.nii.gz', '_%d.nii.gz' % n_im))))
if temp_im is not None:
utils.save_volume(temp_im, aff_im, h_im, os.path.join(image_result_dir,
os.path.basename(path_image.replace('.nii.gz', '_%d.nii.gz' % n_im))))
else:
utils.save_volume(temp_im, aff_im, h_im, os.path.join(image_result_dir,
os.path.basename(path_image.replace('.nii.gz', '_%d.nii.gz' % n_im))))
elif n_dims == 3:
for k in range(n_crop[2]):
k *= patch_shape[2]
# crop volumes
if lab is not None:
temp_la = lab[i:i + patch_shape[0], j:j + patch_shape[1], k:k + patch_shape[2], ...]
else:
temp_la = None
if im is not None:
temp_im = im[i:i + patch_shape[0], j:j + patch_shape[1], k:k + patch_shape[2], ...]
else:
temp_im = None
# write patches
if temp_la is not None:
if full_background | (not (temp_la == 0).all()):
n_im += 1
utils.save_volume(temp_la, aff_lab, h_lab, os.path.join(labels_result_dir,
os.path.basename(path_label.replace('.nii.gz', '_%d.nii.gz' % n_im))))
if temp_im is not None:
utils.save_volume(temp_im, aff_im, h_im, os.path.join(image_result_dir,
os.path.basename(path_image.replace('.nii.gz',
'_%d.nii.gz' % n_im))))
else:
utils.save_volume(temp_im, aff_im, h_im, os.path.join(image_result_dir,
os.path.basename(path_image.replace('.nii.gz', '_%d.nii.gz' % n_im))))
if remove_after_dividing:
if path_image is not None:
os.remove(path_image)
if path_label is not None:
os.remove(path_label)