a b/src/move/tasks/encode_data.py
1
__all__ = ["encode_data"]
2
3
from pathlib import Path
4
5
import numpy as np
6
7
from move.conf.schema import DataConfig
8
from move.core.logging import get_logger
9
from move.data import io, preprocessing
10
from move.visualization.dataset_distributions import plot_value_distributions
11
12
13
def encode_data(config: DataConfig):
14
    """Encodes categorical and continuous datasets specified in configuration.
15
    Categorical data is one-hot encoded, whereas continuous data is z-score
16
    normalized.
17
18
    Args:
19
        config: data configuration
20
    """
21
    logger = get_logger(__name__)
22
    logger.info("Beginning task: encode data")
23
24
    raw_data_path = Path(config.raw_data_path)
25
    raw_data_path.mkdir(exist_ok=True)
26
    interim_data_path = Path(config.interim_data_path)
27
    interim_data_path.mkdir(exist_ok=True, parents=True)
28
    output_path = Path(config.results_path) / "encoded_datasets"
29
    output_path.mkdir(exist_ok=True, parents=True)
30
31
    sample_names = io.read_names(raw_data_path / f"{config.sample_names}.txt")
32
33
    mappings = {}
34
    for dataset_name in config.categorical_names:
35
        # ! the string representation seems to be the variable value (here a string)
36
        logger.info(f"Encoding '{dataset_name}'")
37
        filepath = raw_data_path / f"{dataset_name}.tsv"
38
        names, values = io.read_tsv(filepath, sample_names)
39
        values, mapping = preprocessing.one_hot_encode(values)
40
        mappings[dataset_name] = mapping
41
        io.dump_names(interim_data_path / f"{dataset_name}.txt", names)
42
        np.save(interim_data_path / f"{dataset_name}.npy", values)
43
    if mappings:
44
        io.dump_mappings(interim_data_path / "mappings.json", mappings)
45
46
    for input_config in config.continuous_inputs:
47
        scale = not hasattr(input_config, "scale") or input_config.scale
48
        action_name = "Encoding" if scale else "Reading"
49
        dataset_name = input_config.name
50
        logger.info(f"{action_name} '{dataset_name}'")
51
        filepath = raw_data_path / f"{dataset_name}.tsv"
52
        names, values = io.read_tsv(filepath, sample_names)
53
54
        # Plotting the value distribution for all continuous datasets
55
        # before preprocessing:
56
        fig = plot_value_distributions(values)
57
        fig_path = str(
58
            output_path / f"Value_distribution_{dataset_name}_unprocessed.png"
59
        )
60
        fig.savefig(fig_path)
61
62
        if scale:
63
            logger.debug(
64
                f"Scaling dataset: {dataset_name}, log2 transform: {input_config.log2}"
65
            )
66
            values, mask_1d = preprocessing.scale(values, input_config.log2)
67
            names = names[mask_1d]
68
            logger.debug(f"Columns with zero variance: {np.sum(~mask_1d)}")
69
            # Plotting the value distribution for all continuous datasets:
70
            fig = plot_value_distributions(values)
71
            fig_path = str(output_path / f"Value_distribution_{dataset_name}.png")
72
            fig.savefig(fig_path)
73
74
        io.dump_names(interim_data_path / f"{dataset_name}.txt", names)
75
        np.save(interim_data_path / f"{dataset_name}.npy", values)