Diff of /utils/embeddings.py [000000] .. [3f1788]

Switch to side-by-side view

--- a
+++ b/utils/embeddings.py
@@ -0,0 +1,176 @@
+#!/usr/bin/env
+
+import logging
+import sys
+import numpy as np
+import torch
+
+from torch import nn
+from typing import List
+from pathlib import Path
+from bidict import bidict
+from annoy import AnnoyIndex
+
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.WARN)
+sh = logging.StreamHandler()
+sh.setFormatter(logging.Formatter('%(levelname)s:%(name)s: %(message)s'))
+logger.addHandler(sh)
+
+
+class PretrainedEmbeddings(object):
+  """
+    A wrapper around pre-trained word vectors
+  """
+  def __init__(self, idx_token_bidict: bidict, pretrained_emb_matrix: np.ndarray, is_annoy:
+      bool=False) -> None:
+    """
+      idx_word_map: a bidict mapping between indices and words
+      pretrained_emb_matrix: list of numpy arrays
+    """
+    self.idx_token_bidict = idx_token_bidict
+    self._pretrained_emb_matrix = pretrained_emb_matrix
+    self._vocab_sz = len(pretrained_emb_matrix)
+    self._emb_sz = pretrained_emb_matrix.shape[-1]
+    self.is_annoy = is_annoy
+    self._custom_emb_matrix = None
+
+    if self.is_annoy:
+      self.annoy_idx = AnnoyIndex(self._emb_sz, metric='euclidean')
+      print("Building Annoy Index...")
+      for i, _ in self.idx_token_bidict.items():
+        self.annoy_idx.add_item(i, self._pretrained_emb_matrix[i])
+      self.annoy_idx.build(50)
+      print("Finished")
+
+  def make_custom_embeddings(self, tokens: List[str]) -> None:
+    """
+      Create custom embedding matrix for a specific set of tokens
+
+      Args:
+        tokens: a list of tokens in the dataset
+    """
+    self._custom_emb_matrix = np.zeros((len(tokens), self._emb_sz))
+    for idx, word in enumerate(tokens):
+      if word in self.idx_token_bidict.values():
+        self._custom_emb_matrix[idx, :] = self._pretrained_emb_matrix[
+            self.idx_token_bidict.inverse[word]]
+      else:
+        embedding_i = torch.ones(1, self._emb_sz)
+        torch.nn.init.xavier_uniform_(embedding_i)
+        self._custom_emb_matrix[idx, :] = embedding_i
+
+  @property
+  def pretrained_embeddings(self):
+    return self._pretrained_emb_matrix
+
+  @property
+  def custom_embeddings(self):
+    return self._custom_emb_matrix
+
+  @classmethod
+  def from_file(cls, embedding_file: Path, is_annoy: bool=False, verbose=False):
+    """
+      Instantial from pre-trained vector file
+
+      Vector file should be of the format:
+        word0 x0_0 x0_1 ... x0_N
+        word1 x1_0 x1_1 ... x1_N
+
+      Args:
+        embedding_file: location of the file
+      Returns:
+        Instance of PretrainedEmbeddings
+    """
+    logging_level = logging.INFO if verbose else logger.getEffectiveLevel()
+    logger.setLevel(logging_level)
+    idx_word_bidict = bidict()
+    pretrained_emb_matrix = []
+
+    logger.info(f"Loading pretrained embeddings from file {embedding_file}...")
+    with open(embedding_file, 'r') as fp:
+      for line in fp.readlines():
+        line = line.split(' ')
+        word = line[0]
+        if line[-1] == '\n':
+          del line[-1]
+        vec = np.array([float(x) for x in line[1:]])
+
+        idx_word_bidict.put(len(idx_word_bidict), word)
+        pretrained_emb_matrix.append(vec)
+
+    logger.info("Finished!")
+    return cls(idx_word_bidict, np.stack(pretrained_emb_matrix), is_annoy=is_annoy)
+
+  def get_embedding(self, word: str) -> np.ndarray:
+    return self._pretrained_emb_matrix[self.idx_token_bidict.inverse[word]]
+
+  @property
+  def vocab_size(self) -> int:
+    return self._vocab_sz
+
+  def _get_neighbors(self, vector: np.ndarray, n: int=1) -> List[str]:
+    """
+      Given a vector, return its n nearest neighbors
+
+      Args:
+        vector: should match the size of the vectors in the Annoy idx
+    """
+    nn_idxs = self.annoy_idx.get_nns_by_vector(vector, n)
+    return [self.idx_token_bidict[neighbor_idx] for neighbor_idx in nn_idxs]
+
+  def __len__(self):
+    return self._emb_sz
+
+  def get_analogy(self, word1, word2, word3) -> str:
+    """
+      Computes solutions to analogies using word embeddings
+
+      Analogies are word1 is to word2 as word3 is to ____
+    """
+    if not self.is_annoy:
+      raise NameError(f"AnnoyIndex not built! is_annoy is {self.is_annoy}!")
+
+    # get embedding of 3 words
+    vec1 = self.get_embedding(word1)
+    vec2 = self.get_embedding(word2)
+    vec3 = self.get_embedding(word3)
+
+    # compute 4th word embedding
+    spatial_relationship = vec2 - vec1
+    vec4 = vec3 + spatial_relationship
+
+    closest_words = self._get_neighbors(vec4, n=4)
+    existing_words = set([word1, word2, word3])
+    closest_words = [word for word in closest_words if word not in existing_words]
+
+    if len(closest_words) == 0:
+      return 'Could not find nearest neighbors for computed vector!'
+
+    words = []
+    for word4 in closest_words:
+      words.append(f'{word1} : {word2} :: {word3} : {word4}')
+
+    return '\n'.join(words)
+
+  def __repr__(self):
+    return f"Pretrained Embeddings of {self._emb_sz} dimensions with {self._vocab_sz} words"
+
+if __name__=='__main__':
+  if len(sys.argv) != 2:
+    print(f"Usage: {sys.argv[0]} path_to_embedding_file")
+    sys.exit(-1)
+
+  embedding_file = Path(sys.argv[1])
+  embeddings = PretrainedEmbeddings.from_file(embedding_file, is_annoy=True)
+  print(embeddings)
+  print("--------------------------")
+  print(embeddings.get_analogy(*['man', 'he', 'woman']))
+  print("--------------------------")
+  print(embeddings.get_analogy(*['man', 'uncle', 'woman']))
+  print("--------------------------")
+  print(embeddings.get_analogy(*['talk', 'communicate', 'read']))
+  print("--------------------------")
+  print(embeddings.get_analogy(*['man', 'king', 'woman']))
+  print("--------------------------")
+  print(embeddings.get_analogy(*['king', 'queen', 'husband']))