import itertools
import numpy as np
import re
import numbers
import sys
from scipy import linalg
import pandas as pd
import grammar
from collections import OrderedDict
import matplotlib.pyplot as plt
ver = 0.3
class Fillers(object):
"""
"""
def __init__(self, cfg, add_null=True):
null_filler = '_'
self.g = grammar.Grammar(cfg)
self._get_filler_names()
if add_null:
self.null = null_filler
self.names.append(null_filler)
else:
self.null = None
self._construct_treelets()
self._construct_pairs()
def _get_filler_names(self):
hnf_rules = self.g.hnfRules
fillers = []
for key in hnf_rules:
fillers.append(key)
for item in hnf_rules[key]:
fillers.extend(item)
fillers = list(set(fillers))
fillers.sort()
self.names = fillers
def _construct_treelets(self):
hnf_rules = self.g.hnfRules
treelet_fillers = []
for lhs in hnf_rules.keys():
rhs = hnf_rules[lhs]
for sym in rhs:
if len(sym) == 1 and self.is_bracketed(sym[0]):
curr_treelet = [lhs, sym[0]]
rhs_new = hnf_rules[sym[0]]
for sym_new in rhs_new:
curr_treelet.extend(sym_new)
treelet_fillers.append(curr_treelet)
self.treelets = treelet_fillers
def _construct_pairs(self):
self.pairs = []
for treelet in self.treelets:
self.pairs.append([[treelet[0], treelet[1]], '1_of_1'])
n_daughters = len(treelet) - 2
for ii in range(n_daughters):
self.pairs.append([[treelet[1], treelet[ii + 2]],
'%d_of_%d' % (ii + 1, n_daughters)])
def find(self, filler):
return 0
def find_mothers(self, filler):
return 0
def find_daughters(self, filler):
return 0
def subset_bracketed(self):
"""Return a list of bracketed filler symbols"""
pattern = re.compile('.*\[[0-9]+\]$')
fillers_bracketed = []
for filler in self.names:
if pattern.match(filler) is not None:
fillers_bracketed.append(filler)
return fillers_bracketed
def subset_unbracketed(self):
"""Return a list of bracketed filler symbols"""
fillers_unbracketed = [filler for filler in self.names
if filler not in self.subset_bracketed()]
return fillers_unbracketed
def subset_root(self):
return self.g.getRootNode(self.g.hnfRules)
def subset_terminals(self):
return self.g.getTerminalNodes(self.g.hnfRules)
def is_terminal(self, filler):
return filler in self.subset_terminals()
def is_bracketed(self, filler):
return filler in self.subset_bracketed()
def is_unbracketed(self, filler):
return filler in self.subset_unbracketed()
def is_root(self, filler):
return filler in self.subset_root()
def read_treelets(self):
for ii, treelet in enumerate(self.treelets):
temp = (treelet[0] + ' ( ' +
treelet[1] + ' ( ' + ' '.join(treelet[2:]) + ' ) )')
print(temp)
class SpanRoles(object):
def __init__(self, max_sent_len, max_num_branch=2):
'''Construct a SpanRole object.'''
self.max_sent_len = max_sent_len
self.max_num_branch = max_num_branch
self.use_terminal = False
self._generate()
self._sort()
self._name()
self._graph()
self._check_overlap()
self._construct_treelets()
self._construct_pairs()
def _generate(self):
roles = []
for num_branch in range(self.max_num_branch):
roles += list(itertools.combinations(
range(self.max_sent_len + 1), num_branch + 2))
if self.use_terminal:
roles_terminal = []
for role in roles:
if self.get_width(role) == 1:
roles_terminal.append((role[0], role[1], role[1]))
roles = roles + roles_terminal
self.roles = roles
self.num_roles = len(roles)
def sort_roles(self, roles_tuple):
roles_augmented = []
for role in roles_tuple:
roles_augmented.append(
(role, self.get_width(role),
self.get_num_branch(role), role[0]))
roles_augmented = sorted(
roles_augmented, key=lambda x: (x[1], -x[2], x[3]))
roles_tuple_sorted = [role_augmented[0] for role_augmented
in roles_augmented]
return roles_tuple_sorted
def _sort(self):
self.roles = self.sort_roles(self.roles)
def _name(self):
role_names = []
for role in self.roles:
role_names.append(self.tuple2str(role))
self.names = role_names
def _graph(self):
G = {}
G = OrderedDict(G)
for role_str in self.names:
G[role_str] = {}
G[role_str]['m'] = []
G[role_str]['d'] = []
for role_tuple in self.roles:
role_str = self.tuple2str(role_tuple)
if self.is_bracketed(role_tuple):
mother_tuple = self._find_mother(role_tuple)
mother_str = self.tuple2str(mother_tuple)
daughter_tuple = self._find_daughter(role_tuple)
daughter_str = [self.tuple2str(d) for d in daughter_tuple
if self.get_width(d) > 0]
G[role_str]['m'].append([mother_str])
G[mother_str]['d'].append([role_str])
G[role_str]['d'].append(daughter_str)
for ii, d in enumerate(daughter_str):
G[d]['m'].append([role_str])
if self.use_terminal and self.is_terminal(role_tuple):
mother_tuple = self._find_mother(role_tuple)
mother_str = self.tuple2str(mother_tuple)
G[role_str]['m'].append([mother_str])
G[mother_str]['d'].append([role_str])
self.G = G
def _check_overlap(self):
quant_list = []
for role_name in self.G:
if not self.is_bracketed(self.str2tuple(role_name)):
daughters = self.G[role_name]['d']
if len(daughters) > 0:
ds = [d[0] for d in daughters
if not self.is_terminal(self.str2tuple(d[0]))]
if len(ds) > 0:
quant_list.append(ds)
quant_list.append([role_name])
quant_list = sorted(
quant_list,
key=lambda x: (self.get_width(self.str2tuple(x[0])),
-self.get_num_branch(self.str2tuple(x[0])),
self.str2tuple(x[0])[0]))
self.quant_list = quant_list
def _construct_treelets(self):
treelet_roles = []
for role_str in self.G:
role_tuple = self.str2tuple(role_str)
if self.is_bracketed(role_tuple):
curr_treelet = []
mlist = self.G[role_str]['m']
if len(mlist) > 1:
sys.exit('CHECK!!!')
curr_treelet.extend(mlist[0])
curr_treelet.append(role_str)
dlist = self.G[role_str]['d']
curr_treelet.extend(dlist[0])
treelet_roles.append(curr_treelet)
self.treelets = treelet_roles
def _construct_pairs(self):
self.pairs = []
for treelet in self.treelets:
self.pairs.append([[treelet[0], treelet[1]], '1_of_1'])
n_daughters = len(treelet) - 2
for ii in range(n_daughters):
self.pairs.append([[treelet[1], treelet[ii + 2]],
'%d_of_%d' % (ii + 1, n_daughters)])
def _find_mother(self, role_tuple_bracketed):
if self.is_bracketed(role_tuple_bracketed):
return (role_tuple_bracketed[0], role_tuple_bracketed[-1])
if self.use_terminal and self.is_terminal(role_tuple_bracketed):
return (role_tuple_bracketed[0], role_tuple_bracketed[-1])
def _find_daughter(self, role_tuple_bracketed):
if self.is_bracketed(role_tuple_bracketed):
return [(role_tuple_bracketed[ii], role_tuple_bracketed[ii + 1])
for ii in range(len(role_tuple_bracketed) - 1)]
def str2tuple(self, role_str):
return tuple([int(pos) for pos in role_str[1:-1].split(',')])
def tuple2str(self, role_tuple):
return str(role_tuple).replace(' ', '')
def get_width(self, role_tuple):
return role_tuple[-1] - role_tuple[0]
def get_num_branch(self, role_tuple):
return len(role_tuple) - 1
def is_bracketed(self, role_tuple):
return (self.get_num_branch(role_tuple) > 1 and
self.get_width(role_tuple) > 1)
def is_terminal(self, role_tuple):
if self.use_terminal:
return (self.get_width(role_tuple) == 1 and
self.get_num_branch(role_tuple) == 2)
else:
return (self.get_width(role_tuple) == 1 and
self.get_num_branch(role_tuple) == 1)
def subset_terminals(self):
'''Return a list of terminal roles (tuple)'''
return [r for r in self.roles if self.is_terminal(r)]
def subset_bracketed(self):
'''Return a list of terminal roles (tuple)'''
return [r for r in self.roles if self.is_bracketed(r)]
def read_treelets(self):
for ii, treelet in enumerate(self.treelets):
temp = (treelet[0] + ' ( ' +
treelet[1] + ' ( ' + ' '.join(treelet[2:]) + ' ) )')
print(temp)
class HarmonicGrammar(object):
"""Construct an object implementing a Harmonic Grammar
[ [['S[1]/(0,1,2)', 'A/(0,1)'], 2.], # H(S[1]/(0,1,2), A/(0,1)) = 2
[['S/(0,2)', 'S[1]/(0,1,2)'], 2.],
[['A/(0,1)'], -1.], # H(A/(0,1)) = -1
...
... # For parsimony,
[['A'], -1.], # H(A/r) = -1 for all r in R, a set of roles
[['(0,1,2)'], -3.] # H(f/(0,1,2)) = -3 for all f in F, a set of fillers
...]
"""
def __init__(self, cfg, size, role_type="span_role",
add_null=True, match='pair', penalize='impossible',
null_bias=0., add_constraint=False, unary_base='filler'):
self.role_type = role_type
self.size = size
self.rules = []
self.filler_names = None
self.role_names = None
self.binding_names = None
self.num_branch = 2
self.bias_constraint = -10.
self.unused_binding_harmony = -10
self.null_bias = null_bias
self.grammar = grammar.Grammar(cfg)
self.fillers = Fillers(cfg, add_null=add_null)
self.filler_names = self.fillers.names
if role_type == "span_role":
self.roles = SpanRoles(
max_sent_len=size, max_num_branch=self.num_branch)
self.role_names = self.roles.names
self._set_binding_names()
self._construct_treelets(add_constraint)
self._construct_pairs()
self._convert_tuple_to_str()
self._add_rules_binary(match=match)
self._add_rules_unary(which=unary_base)
if add_constraint:
self._add_constraints(penalize=penalize)
def _add_constraints(self, penalize='unused'):
if penalize is 'unused':
bindings_used = []
for treelet in self.treelets:
[bindings_used.append(binding) for binding in treelet]
for binding in self.binding_names:
if binding not in bindings_used:
if binding.split('/')[0] != '_':
self.rules.append(
[[binding], self.unused_binding_harmony])
elif penalize is 'impossible':
for b in self.binding_names:
f, r = b.split('/')
r_tuple = self.roles.str2tuple(r)
if f is not self.fillers.null:
if f in self.fillers.subset_terminals():
if r_tuple not in self.roles.subset_terminals():
self.rules.append([[b], self.bias_constraint])
elif self.fillers.is_root(f):
if (r_tuple in self.roles.subset_terminals()) or \
(r_tuple in self.roles.subset_bracketed()):
self.rules.append([[b], self.bias_constraint])
elif f not in self.get_bracketed_fillers():
if (r_tuple in self.roles.subset_terminals()) or \
(r_tuple in self.roles.subset_bracketed()):
self.rules.append([[b], self.bias_constraint])
elif f in self.get_bracketed_fillers():
if r_tuple not in self.roles.subset_bracketed():
self.rules.append([[b], self.bias_constraint])
def _set_binding_names(self, sep='/'):
self.binding_names = [f + sep + r for r in self.roles.names
for f in self.fillers.names]
def _construct_treelets(self, add_constraint):
terminal_fillers = self.get_terminal_fillers()
terminal_roles = self.get_terminal_roles()
if self.fillers.null is not None:
null_filler = self.fillers.null
else:
null_filler = None
treelet_bindings = []
for treelet_f in self.fillers.treelets:
for treelet_r in self.roles.treelets:
if len(treelet_f) == len(treelet_r):
treelet_b = list(zip(treelet_f, treelet_r))
if add_constraint:
count = 0
for binding_index in range(len(treelet_b)):
is_terminal_f = (treelet_b[binding_index][0]
in terminal_fillers)
is_terminal_r = (treelet_b[binding_index][1]
in terminal_roles)
is_null_f = (treelet_b[binding_index][0] ==
null_filler)
if (is_terminal_f and is_terminal_r) or \
((not is_terminal_f) and (not is_terminal_r)) or (is_null_f):
count += 1
if count == len(treelet_b):
treelet_bindings.append(treelet_b)
else:
treelet_bindings.append(treelet_b)
self.treelets = treelet_bindings
def _construct_pairs(self):
pair_bindings = []
for pair_f in self.fillers.pairs:
for pair_r in self.roles.pairs:
if pair_f[1] == pair_r[1]:
pair_bindings.append(list(zip(pair_f[0], pair_r[0])))
self.pairs = pair_bindings
def _convert_tuple_to_str(self):
for ii, treelet in enumerate(self.treelets):
for jj, binding in enumerate(treelet):
self.treelets[ii][jj] = binding[0] + '/' + binding[1]
for ii, pair in enumerate(self.pairs):
for jj, binding in enumerate(pair):
self.pairs[ii][jj] = binding[0] + '/' + binding[1]
def _add_rules_binary(self, match='pair'):
"""Add binary HG rules"""
if match == 'treelet':
for treelet in self.treelets:
self.rules.append([[treelet[0], treelet[1]], 2.0])
for ii in range(len(treelet) - 2):
self.rules.append([[treelet[1], treelet[ii + 2]], 2.0])
elif match == 'pair':
for pair in self.pairs:
self.rules.append([[pair[0], pair[1]], 2.0])
def _add_rules_unary(self, which="filler"):
"""Add unary HG rules"""
for ii, filler in enumerate(self.fillers.names):
if filler in self.fillers.subset_bracketed():
self.rules.append([[filler], -1.0 - self.num_branch])
elif filler in self.fillers.subset_terminals():
self.rules.append([[filler], -1.0])
elif filler in self.fillers.subset_root():
self.rules.append([[filler], -1.0])
elif filler == self.fillers.null:
self.rules.append([[filler], self.null_bias])
else:
self.rules.append([[filler], -2.0])
def read_rules(self):
print("Binary rules:\n")
for rule in self.rules:
if len(rule[0]) == 2:
print('H(' + rule[0][0] + ', ' + rule[0][1] + ') = %.1f' % rule[1])
print("\nUnary rules:\n")
for rule in self.rules:
if len(rule[0]) == 1:
if rule[0][0] in self.fillers.names:
print('H(' + rule[0][0] + '/*) = %.1f' % rule[1])
elif rule[0][0] in self.roles.names:
print('H(*/' + rule[0][0] + ') = %.1f' % rule[1])
elif rule[0][0] in self.binding_names:
print('H(' + rule[0][0] + ') = %.1f' % rule[1])
else:
return 0
def _add_rules_root(self):
root_fillers = self.grammar.getRootNode(self.grammar.hnfRules)
if not isinstance(root_fillers, list):
root_fillers = [root_fillers]
for filler in root_fillers:
self.rules.append([[filler], -1.])
return 0
def reorder_fillers(self, fillers_ordered):
if self.fillers.null is not None:
if not (self.fillers.null in fillers_ordered):
fillers_ordered.append(self.fillers.null)
if set(self.fillers.names) == set(fillers_ordered):
self.fillers.names = fillers_ordered
else:
sys.exit("The set of filler names constructued ffrom grammar is not same as the set of filler names you provided.")
self._set_binding_names()
def get_binary(self):
return 0
def get_unary(self):
return 0
def sort(self):
return 0
def get_terminal_fillers(self):
terminal_fillers = self.grammar.getTerminalNodes(self.grammar.hnfRules)
terminal_fillers.sort()
return terminal_fillers
def get_bracketed_fillers(self):
"""Return a list of bracketed filler symbols"""
pattern = re.compile('.*\[[0-9]+\]$')
fillers_bracketed = []
for key in self.grammar.hnfRules.keys():
if pattern.match(key) is not None:
fillers_bracketed.append(key)
return fillers_bracketed
def get_terminal_roles(self):
return [str(role).replace(' ', '') for role in self.roles.subset_terminals()]
def get_bracketed_roles(self):
return [str(role).replace(' ', '') for role in self.roles.subset_bracketed()]
class GscNet(object):
def __init__(self, hg=None, encodings=None, opts=None, seed=None):
self._set_opts()
self._update_opts(opts=opts)
self._set_encodings()
self._update_encodings(encodings=encodings)
if seed is not None:
self.set_seed(seed)
self.seed = seed
self.hg = hg
self._add_names()
self._generate_encodings()
self._compute_TPmat()
self.WC = np.zeros((self.num_bindings, self.num_bindings))
self.bC = np.zeros(self.num_bindings)
if hg is not None:
self._build_model(hg)
self._set_weights()
self._set_biases()
self._set_quant_list()
self.actC = np.zeros(self.num_bindings)
self.actC_prev = np.zeros(self.num_bindings)
self.act = self.C2N()
self.act_prev = self.C2N(actC=self.actC_prev)
self.extC = np.zeros(self.num_bindings)
self.extC_prev = np.zeros(self.num_bindings)
self.ext = self.C2N(actC=self.extC)
self.ext_prev = self.C2N(actC=self.extC_prev)
if isinstance(self.opts['bowl_center'], numbers.Number):
self.bowl_center = (self.opts['bowl_center'] *
np.ones(self.num_bindings))
else:
self.bowl_center = self.opts['bowl_center']
if self.opts['bowl_strength'] is None:
self.opts['bowl_strength'] = (
self._compute_recommended_bowl_strength() +
self.opts['beta_min_offset'])
else:
self.check_bowl_strength()
self.bowl_strength = self.opts['bowl_strength']
self.zeta = self.C2N(actC=self.bowl_center)
self.t = 0
self.speed = None
self.ema_speed = None
self.clamped = False
self.q = self.opts['q_init']
self.T = self.opts['T_init']
self.dt = self.opts['dt']
self.reset()
def _set_quant_list(self):
quant_list = []
for role_name in self.role_names:
quant_list.append(self.find_roles(role_name))
self.quant_list = quant_list
def _set_opts(self):
"""Set option variables to default values."""
self.opts = {}
self.opts['trace_varnames'] = [
'act', 'H', 'H0', 'Q', 'q', 'T', 't', 'ema_speed', 'speed']
self.opts['norm_ord'] = np.inf
self.opts['coord'] = 'N'
self.opts['ema_factor'] = 0.001
self.opts['ema_tau'] = -1 / np.log(self.opts['ema_factor'])
self.opts['T_init'] = 1e-3
self.opts['T_min'] = 0.
self.opts['T_decay_rate'] = 1e-3
self.opts['q_init'] = 0.
self.opts['q_max'] = 200.
self.opts['q_rate'] = 10.
self.opts['c'] = 0.5
self.opts['bowl_center'] = 0.5
self.opts['bowl_strength'] = None
self.opts['beta_min_offset'] = 0.1
self.opts['dt'] = 0.001
self.opts['H0_on'] = True
self.opts['H1_on'] = True
self.opts['Hq_on'] = True
self.opts['max_dt'] = 0.01
self.opts['min_dt'] = 0.0005
self.opts['q_policy'] = None
def _update_opts(self, opts):
"""Update option variable values"""
if opts is not None:
for key in opts:
if key in self.opts:
self.opts[key] = opts[key]
if key == 'ema_factor':
self.opts['ema_tau'] = -1 / np.log(self.opts[key])
if key == 'ema_tau':
self.opts['ema_factor'] = np.exp(-1 / self.opts[key])
else:
sys.exit('Check [opts]')
def _set_encodings(self):
"""Set encoding variables to default values."""
self.encodings = {}
self.encodings['dp_f'] = 0.
self.encodings['dp_r'] = 0.
self.encodings['coord_f'] = 'dist'
self.encodings['coord_r'] = 'dist'
self.encodings['dim_f'] = None
self.encodings['dim_r'] = None
self.encodings['filler_names'] = None
self.encodings['role_names'] = None
self.encodings['F'] = None
self.encodings['R'] = None
self.encodings['similarity'] = None
def _update_encodings(self, encodings):
"""Update encoding variables"""
if encodings is not None:
for key in encodings:
if key in self.encodings:
self.encodings[key] = encodings[key]
def _add_names(self):
"""Add filler, role, and binding names to the GscNet object"""
if self.hg is None:
if self.encodings['filler_names'] is None:
sys.exit("Please provide a list of filler names.")
if self.encodings['role_names'] is None:
sys.exit("Please provide a list of role names.")
self.filler_names = self.encodings['filler_names']
self.role_names = self.encodings['role_names']
self.binding_names = [f + '/' + r for r in self.role_names for f in self.filler_names]
else:
if isinstance(self.hg, HarmonicGrammar):
self.filler_names = self.hg.fillers.names
self.role_names = self.hg.roles.names
self.binding_names = self.hg.binding_names
else:
sys.exit('[hg] is not an instance of HarmonicGrammar class.')
self.num_fillers = len(self.filler_names)
self.num_roles = len(self.role_names)
self.num_bindings = len(self.binding_names)
def _generate_encodings(self):
"""Generate vector encodings of fillers, roles, and their bindings"""
if self.encodings['similarity'] is not None:
dp_f = np.diag(np.ones(self.num_fillers))
dp_r = np.diag(np.ones(self.num_roles))
for dp in self.encodings['similarity']:
if all(sym in self.filler_names for sym in dp[0]):
dp_f[self.filler_names.index(dp[0][0]), self.filler_names.index(dp[0][1])] = dp[1]
dp_f[self.filler_names.index(dp[0][1]), self.filler_names.index(dp[0][0])] = dp[1]
elif all(sym in self.role_names for sym in dp[0]):
dp_r[self.role_names.index(dp[0][0]), self.role_names.index(dp[0][1])] = dp[1]
dp_r[self.role_names.index(dp[0][1]), self.role_names.index(dp[0][0])] = dp[1]
else:
sys.exit('Cannot find some f/r bindings in your similarity list.')
self.encodings['dp_f'] = dp_f
self.encodings['dp_r'] = dp_r
self.F = encode_symbols(
self.num_fillers,
coord=self.encodings['coord_f'],
dp=self.encodings['dp_f'],
dim=self.encodings['dim_f'])
self.R = encode_symbols(
self.num_roles,
coord=self.encodings['coord_r'],
dp=self.encodings['dp_r'],
dim=self.encodings['dim_r'])
if self.encodings['F'] is not None:
self.F = self.encodings['F']
if self.encodings['R'] is not None:
self.R = self.encodings['R']
self.dim_f = self.F.shape[0]
self.dim_r = self.R.shape[0]
self.num_units = self.dim_f * self.dim_r
ndigits = len(str(self.num_units))
self.unit_names = ['U' + str(ii+1).zfill(ndigits) for ii in list(range(self.num_units))]
def _build_model(self, hg):
"""Set the weight and bias values using a HarmonicGrammar object [hg]."""
if isinstance(hg, HarmonicGrammar):
for rule in hg.rules:
if len(rule[0]) == 2:
self.set_weight(rule[0][0], rule[0][1], rule[1])
elif len(rule[0]) == 1:
if rule[0][0] in self.binding_names:
self.set_bias(rule[0][0], rule[1])
elif rule[0][0] in self.filler_names:
self.set_filler_bias(rule[0][0], rule[1])
elif rule[0][0] in self.role_names:
self.set_role_bias(rule[0][0], rule[1])
else:
sys.exit('Check the rule in your Harmonic Grammar:' + rule)
else:
sys.exit('The given grammar as hg is not an instance of HarmonicGrammar class.')
if np.allclose(self.W, self.W.T) == False:
sys.exit("The weight matrix (2D array) is not symmetric. Please check it.")
def _compute_TPmat(self):
"""Compute the matrices of change of basis from conceptual to neural and from neural to conceptual coordinates.
"""
TP = np.kron(self.R, self.F)
if TP.shape[0] == TP.shape[1]:
TPinv = linalg.inv(TP)
else:
TPinv = linalg.pinv(TP)
self.TP = TP
self.TPinv = TPinv
self.Gc = self.TPinv.T.dot(self.TPinv)
def _set_weights(self):
"""Compute the weight values in the neural space (distributed representation)"""
self.W = self.TPinv.T.dot(self.WC).dot(self.TPinv)
def _set_biases(self):
"""Compute the bias values in the neural space (distributed representation)"""
self.b = self.TPinv.T.dot(self.bC)
def _compute_recommended_bowl_strength(self):
"""Compute the recommended value of bowl strength. Note that the value depends on external input."""
eigvals, eigvecs = np.linalg.eigh(self.WC)
eig_max = max(eigvals)
if np.sum(abs(self.bowl_center)) > 0:
if self.num_bindings == 1:
beta1 = -(self.bC + self.extC) / self.bowl_center
beta2 = (self.bC + self.extC + eig_max) / (1 - self.bowl_center)
else:
beta1 = -min((self.bC + self.extC) /self.bowl_center)
beta2 = max((self.bC + self.extC + eig_max) / (1 - self.bowl_center))
val = max(eig_max, beta1, beta2)
else:
val = eig_max
return val
def check_bowl_strength(self, disp=True):
'''Compute and print the recommended beta value
given the weights and biases in the C-space.'''
beta_min = self._compute_recommended_bowl_strength()
if self.opts['bowl_strength'] <= beta_min:
sys.exit("Bowl strength should be greater than %.4f." % beta_min)
if disp:
print('(Current bowl strength: %.3f) must be greater than (minimum: %.3f)' % (self.opts['bowl_strength'], beta_min))
def update_bowl_center(self, bowl_center):
"""Update the bowl center
Usage:
>>> net.update_bowl_center(0.3) # Set the bowl center to 0.3 * \vec{1}
>>>
>>> import numpy as np
>>> bowl_center = np.random.random(net.num_bindings)
>>> net.update_bowl_center(bowl_center)
: bowl_center: float or 1d NumPy array (size=number of bindings). the bowl center.
"""
if not (isinstance(bowl_center, np.ndarray) or isinstance(bowl_center, numbers.Number)):
sys.exit('You must provide a scalar or a NumPy array as bowl_center.')
if isinstance(bowl_center, numbers.Number):
bowl_center = bowl_center * np.ones(self.num_bindings)
if bowl_center.shape[0] != self.num_bindings:
sys.exit('When you provide a NumPy array as bowl_center, it must have the same number of elements as the number of f/r bindings.')
self.bowl_center = bowl_center
self.zeta = self.C2N(actC=self.bowl_center)
def update_bowl_strength(self, bowl_strength=None):
"""Replace the current bowl strength with
the recommended bowl strength (+ offset)
Usage:
>>> net = gsc.GscNet(...)
>>> net.set_weight('a/(0,1)', 'b/(1,2)', 2.0)
>>> net.update_bowl_strength()
: bowl_strength : float or None (=default)
"""
if bowl_strength is None:
self.opts['bowl_strength'] = (
self._compute_recommended_bowl_strength() +
self.opts['beta_min_offset'])
else:
self.opts['bowl_strength'] = bowl_strength
self.bowl_strength = self.opts['bowl_strength']
def set_weight(self, binding_name1, binding_name2, weight, symmetric=True):
'''Set the weight of a connection between binding1 and binding2.
When symmetric is set to True (default), the connection weight from
binding2 to binding1 is set to the same value.'''
idx1 = self.find_bindings(binding_name1)
idx2 = self.find_bindings(binding_name2)
if symmetric:
self.WC[idx1, idx2] = self.WC[idx2, idx1] = weight
else:
self.WC[idx2, idx1] = weight
self._set_weights()
def set_bias(self, binding_name, bias):
'''Set bias values of [binding_name] to [bias]'''
idx = self.find_bindings(binding_name)
self.bC[idx] = bias
self._set_biases()
def set_filler_bias(self, filler_name, bias):
'''Set the bias of bindings of all roles
with particular fillers to [bias].'''
filler_list = [bb.split('/')[0] for bb in self.binding_names]
if not isinstance(filler_name, list):
filler_name = [filler_name]
for jj, filler in enumerate(filler_name):
idx = [ii for ii, ff in enumerate(filler_list) if filler == ff]
self.bC[idx] = bias
self._set_biases()
def set_role_bias(self, role_name, bias):
'''Set the bias of bindings of all fillers
with particular roles to [bias].'''
role_list = [bb.split('/')[1] for bb in self.binding_names]
if not isinstance(role_name, list):
role_name = [role_name]
for jj, role in enumerate(role_name):
idx = [ii for ii, rr in enumerate(role_list) if role == rr]
self.bC[idx] = bias
self._set_biases()
def find_bindings(self, binding_names):
'''Find the indices of the bindings from the list of binding names.'''
if not isinstance(binding_names, list):
binding_names = [binding_names]
return [self.binding_names.index(bb) for bb in binding_names]
def find_fillers(self, filler_name):
if not isinstance(filler_name, list):
filler_name = [filler_name]
filler_list = [bb.split('/')[0] for bb in self.binding_names]
filler_idx = []
for jj, filler in enumerate(filler_name):
idx = [ii for ii, ff in enumerate(filler_list) if filler == ff]
filler_idx += idx
return filler_idx
def find_roles(self, role_name):
if not isinstance(role_name, list):
role_name = [role_name]
role_list = [bb.split('/')[1] for bb in self.binding_names]
role_idx = []
for jj, role in enumerate(role_name):
idx = [ii for ii, rr in enumerate(role_list) if role == rr]
role_idx += idx
return role_idx
def vec2mat(self, actC=None):
'''Convert an activation state vector to a matrix form
in which each row corresponds to a filler
and each column corresponds to a role.'''
if actC is None:
actC = self.S2C()
return actC.reshape(self.num_fillers, self.num_roles, order='F')
def C2N(self, actC=None):
'''Change basis: from conceptual/pattern to neural space.'''
if actC is None:
actC = self.actC
return self.TP.dot(actC)
def N2C(self, act=None):
'''Change basis: from neural to conceptual/pattern space.'''
if act is None:
act = self.act
return self.TPinv.dot(act)
def read_weight(self, which='WC'):
'''Print the weight matrix in a readable format
(in the pattern coordinate).'''
if which[-1] == 'C':
print(pd.DataFrame(
getattr(self, which), index=self.binding_names,
columns=self.binding_names))
else:
print(pd.DataFrame(
getattr(self, which), index=self.unit_names,
columns=self.unit_names))
def read_bias(self, which='bC', print_vertical=True):
'''Print the bias vector (in the pattern coordinate).'''
if which[-1] == 'C':
if print_vertical:
print(pd.DataFrame(
getattr(self, which).reshape(self.num_bindings, 1),
index=self.binding_names, columns=["bias"]))
else:
print(pd.DataFrame(
getattr(self, which).reshape(1, self.num_bindings),
index=["bias"], columns=self.binding_names))
else:
if print_vertical:
print(pd.DataFrame(
getattr(self, which).reshape(self.num_bindings, 1),
index=self.unit_names, columns=["bias"]))
else:
print(pd.DataFrame(
getattr(self, which).reshape(1, self.num_bindings),
index=["bias"], columns=self.unit_names))
def read_state(self, act=None):
'''Print the current state (C-SPACE) in a readable format.
Pandas should be installed.'''
if act is None:
act = self.act
actC = self.vec2mat(self.N2C(act))
print(pd.DataFrame(
actC, index=self.filler_names, columns=self.role_names))
def read_grid_point(self, act=None, disp=True, skip=True):
'''Print a grid point close to the current state. The grid point will be
chosen by the snap-it method: a filler with the highest activation
value in each role will be chosen.'''
act_min = 0.5
if act is None:
act = self.act
actC = self.vec2mat(self.N2C(act))
winner_idx = np.argmax(actC, axis=0)
winners = [self.filler_names[ii] for ii in winner_idx]
winners = ["%s/%s" % bb for bb in zip(winners, self.role_names)]
if skip:
roles = [role_num for role_num, filler_num in enumerate(winner_idx)
if (actC[filler_num, role_num] > act_min and
self.filler_names[filler_num] is not self.hg.fillers.null)]
winners = [winners[r] for r in roles]
if disp:
print(winners)
return winners
def get_grid_points(self, n=1e7):
"""Get a list of the top [n] grid points with high harmony values
and compute H_0 at every grid point. Regardless of the [n] value,
the program will check every grid point. Note that the number of
grid points is [num_fillers]^[num_roles] which increases explosively
as [num_roles] increases. This method works only when the total
number of grid points is reasonably small (currently set to 1e7)."""
if self.num_fillers ** self.num_roles > 1e7:
sys.exit('There are too many grid points (= %d) to check all grid points.' % self.num_fillers ** self.num_roles)
if self.num_fillers ** self.num_roles < n:
n = self.num_fillers ** self.num_roles
quant_list = [None] * self.num_roles
for rind, role in enumerate(self.role_names):
quant_list[rind] = [self.binding_names[ii] for ii in self.find_roles(role)]
gpset = []
gpset_h = np.zeros(n)
if self.num_fillers ** self.num_roles > 10000:
print('Of %d grid points: ' % self.num_fillers ** self.num_roles)
for ii, gp in enumerate(itertools.product(*quant_list)):
if (ii + 1) % 1e4 == 0:
print('[%06d]' % (ii + 1), end='')
if (ii + 1) % 1e5 == 0:
print('')
gp = list(gp)
self.set_state(gp)
hh = self.H0()
if ii < n:
gpset_h[ii] = hh
gpset.append(gp)
else:
if hh > np.min(gpset_h):
gpset[np.argmin(gpset_h)] = gp
gpset_h[np.argmin(gpset_h)] = hh
idx = np.argsort(gpset_h)[::-1]
self.gpset = [gpset[ii] for ii in idx]
self.gpset_h = gpset_h[idx]
def set_seed(self, num):
'''Set a random number seed.'''
np.random.seed(num)
def H(self, act=None):
"""Evalutate total harmony"""
return self.Hg(act) + float(self.opts['Hq_on']) * self.q * self.Qa(act)
def Hg(self, act=None):
"""Evalutate H_G (= H0 + H1)"""
return (float(self.opts['H0_on']) * self.H0(act) +
float(self.opts['H1_on']) * self.H1(act))
def H0(self, act=None):
"""Evaluate H0"""
if act is None:
act = self.act
return 0.5 * act.dot(self.W).dot(act) + (self.b + self.ext).dot(act)
def H1(self, act=None):
"""Evalutate H1 (bowl harmony)"""
if act is None:
act = self.act
return (self.bowl_strength *
(-0.5 * (act - self.zeta).T.dot(self.Gc).dot(act - self.zeta)))
def Q(self, act=None):
"""Evaluate quantization harmony Q = c * Q0 + (1-c) * Q1"""
return (self.opts['c'] * self.Q0(act) +
(1 - self.opts['c']) * self.Q1(act))
def Qa(self, act=None):
"""Evaluate quantization harmony Q = c * Q0 + (1-c) * Q1"""
return self.opts['c'] * self.Q0(act) + (1-self.opts['c']) * self.Q1a(act=act, quant_list=self.quant_list)
def Qb(self, act=None):
"""Evaluate quantization harmony Q = c * Q0 + (1-c) * Q1"""
return self.opts['c'] * self.Q0(act) + (1-self.opts['c']) * self.Q1b(act=act, quant_list=self.quant_list)
def Q0(self, act=None):
"""Evaluate Q0"""
if act is None:
act = self.act
actC = self.N2C(act)
return -np.sum(actC**2 * (1 - actC)**2)
def Q1(self, act=None):
"""Evaluate Q1"""
if act is None:
act = self.act
return -np.sum((np.sum(self.vec2mat(self.N2C(act))**2, axis=0) - 1)**2)
def Q1a(self, act=None, quant_list=None):
"""Evaluate Q1 (sum of squared = 1)"""
if act is None:
act = self.act
if quant_list is None:
quant_list = self.quant_list
actC = self.N2C(act)
q1 = 0
for qlist in quant_list:
ssq = actC[qlist].dot(actC[qlist])
q1 += (ssq - 1)**2
return -q1
def Q1b(self, act=None, quant_list=None):
"""Evaluate Q1 (sum of squared = 0 or 1)"""
if act is None:
act = self.act
if quant_list is None:
quant_list = self.quant_list
actC = self.N2C(act)
q1 = 0
for qlist in quant_list:
ssq = actC[qlist].dot(actC[qlist])
q1 += (ssq - 1)**2 * ssq**2
return -q1
def HGrad(self, act=None):
'''Compute the harmony gradient evaluated at the current state'''
return self.HgGrad(act) + float(self.opts['Hq_on']) * self.q * self.QaGrad(act)
def HgGrad(self, act=None):
"""Compute the gradient of grammar harmony H_G"""
return (float(self.opts['H0_on']) * self.H0Grad(act) +
float(self.opts['H1_on']) * self.H1Grad(act))
def H0Grad(self, act=None):
if act is None:
act = self.act
return self.W.dot(act) + self.b + self.ext
def H1Grad(self, act=None):
if act is None:
act = self.act
return (self.bowl_strength *
(-self.Gc.dot(act) + self.Gc.dot(self.zeta)))
def QGrad(self, act=None):
return (self.opts['c'] * self.Q0Grad(act) +
(1 - self.opts['c']) * self.Q1Grad(act))
def QaGrad(self, act=None):
return self.opts['c'] * self.Q0Grad(act) + (1-self.opts['c']) * self.Q1aGrad(act=act, quant_list=self.quant_list)
def QbGrad(self, act=None):
return self.opts['c'] * self.Q0Grad(act) + (1-self.opts['c']) * self.Q1bGrad(act=act, quant_list=self.quant_list)
def Q0Grad(self, act=None):
if act is None:
act = self.act
actC = self.N2C(act)
g = 2 * actC * (1 - actC) * (1 - 2 * actC)
return -np.einsum('ij,i', self.TPinv, g)
def Q1Grad(self, act=None):
"""Compute the gradient of quantization harmony (Q1)"""
if act is None:
act = self.act
TPinv_reshaped = self.TPinv.reshape(
(self.num_fillers, self.num_roles, self.num_units), order='F')
actC = self.N2C(act)
amat = self.vec2mat(actC)
term1 = np.einsum('ij->j', amat**2) - 1
term2 = np.einsum('ij,ijk->jk', amat, TPinv_reshaped)
return -4 * np.einsum('j,jk', term1, term2)
def Q1aGrad(self, act=None, quant_list=None):
"""Compute the gradient of quantization harmony (Q1)"""
if act is None:
act = self.act
if quant_list is None:
quant_list = self.quant_list
actC = self.N2C(act)
q1grad = 0
for qlist in quant_list:
curr_actC = actC[qlist]
curr_TPinv = self.TPinv[qlist, :]
curr_term1 = (curr_actC**2).sum() - 1
curr_term2 = np.einsum('i,ij->j', curr_actC, curr_TPinv)
q1grad += curr_term1 * curr_term2
return -4 * q1grad
def Q1bGrad(self, act=None, quant_list=None):
"""Compute the gradient of quantization harmony (Q1)"""
if act is None:
act = self.act
if quant_list is None:
quant_list = self.quant_list
actC = self.N2C(act)
q1grad = 0
for qlist in quant_list:
curr_actC = actC[qlist]
curr_TPinv = self.TPinv[qlist, :]
ssq = curr_actC.dot(curr_actC)
curr_term1 = ssq * (ssq - 1) * (2*ssq - 1)
curr_term2 = np.einsum('i,ij->j', curr_actC, curr_TPinv)
q1grad += curr_term1 * curr_term2
return -4 * q1grad
def initialize_traces(self, trace_list):
"""Create storage for traces."""
if trace_list == 'all':
trace_list = self.opts['trace_varnames']
else:
if not isinstance(trace_list, list):
sys.exit(('Check [trace_list] that should be a list object. \n'
'If you want to log a single variable (e.g., H), \n'
'you must provide ["H"], not "H", as the value of [trace_list].'))
var_not_in_varnames = [var for var in trace_list if var not in self.opts['trace_varnames']]
if len(var_not_in_varnames) > 0:
sys.exit(('Check [trace_list]. You provided variable name(s) that are not availalbe in the software.\n'
'Currently, the following variables are available:\n' + self.opts['trace_varnames']))
if hasattr(self, 'traces'):
for key in trace_list:
self.traces[key] = list(self.traces[key])
else:
self.traces = {}
for key in trace_list:
self.traces[key] = []
self.update_traces()
def update_traces(self):
"""Log traces"""
if 'act' in self.traces:
self.traces['act'].append(list(self.act))
if 'H' in self.traces:
self.traces['H'].append(self.H())
if 'H0' in self.traces:
self.traces['H0'].append(self.H0())
if 'Q' in self.traces:
self.traces['Q'].append(self.Q())
if 'q' in self.traces:
self.traces['q'].append(self.q)
if 't' in self.traces:
self.traces['t'].append(self.t)
if 'T' in self.traces:
self.traces['T'].append(self.T)
if 'ema_speed' in self.traces:
self.traces['ema_speed'].append(self.ema_speed)
if 'speed' in self.traces:
self.traces['speed'].append(self.speed)
def finalize_traces(self):
"""Convert list objects of traces to NumPy array objects."""
for key in self.traces:
self.traces[key] = np.array(self.traces[key])
def _compute_projmat(self, A):
"""Compute a projection matrix of a given matrix A. A is an n x m
matrix of basis (column) vectors of the subspace. This function
works only when the rank of A is equal to the nunmber of columns of A.
"""
return A.dot(linalg.inv(A.T.dot(A))).dot(A.T)
def clamp(self, binding_names,
clamp_vals=1.0, clamp_comp=False):
'''Clamp f/r bindings to [clamp_vals]'''
if not isinstance(clamp_vals, list):
clamp_vals = [clamp_vals]
if not isinstance(binding_names, list):
binding_names = [binding_names]
if len(clamp_vals) > 1:
if len(clamp_vals) != len(binding_names):
sys.exit('The number of bindings clamped is not equal to the number of values provided.')
self.clamped = True
self.binding_names_clamped = binding_names
clampvecC = np.zeros(self.num_bindings)
if clamp_comp:
role_names = [b.split('/')[1] for b in binding_names]
idx1 = self.find_roles(role_names)
clampvecC[idx1] = 0.0
idx = self.find_bindings(binding_names)
clampvecC[idx] = clamp_vals
self.clampvecC = clampvecC
if clamp_comp:
idx += idx1
idx.sort()
idx0 = [bb for bb in np.arange(self.num_bindings) if bb not in idx]
A = self.TP[:, idx0]
if len(idx0) > 0:
self.projmat = self._compute_projmat(A)
else:
self.projmat = np.zeros((self.num_units, self.num_units))
self.clampvec = self.C2N(clampvecC)
self.act = self.act_clamped(self.act)
self.actC = self.N2C()
def unclamp(self):
"""Unclamp"""
if self.clamped is True:
del self.clampvec
del self.clampvecC
del self.projmat
del self.binding_names_clamped
self.clamped = False
def act_clamped(self, act=None):
"""Get a new activation vector after projecting an activation vector
to a subspace."""
if act is None:
act = self.act
return self.projmat.dot(act) + self.clampvec
def set_input(self, binding_names, ext_vals, inhib_comp=False):
'''Set external input.'''
if not isinstance(ext_vals, list):
ext_vals = [ext_vals]
if not isinstance(binding_names, list):
binding_names = [binding_names]
if len(ext_vals) > 1:
if len(binding_names) != len(ext_vals):
sys.exit("binding_names and ext_vals have different lengths.")
self.clear_input()
idx = self.find_bindings(binding_names)
self.extC[idx] = ext_vals
self.ext = self.C2N(self.extC)
def clear_input(self):
'''Remove external input'''
self.extC = np.zeros(self.num_bindings)
self.ext = self.C2N(self.extC)
def reset(self):
'''Reset the model. q and T will be set to their initial values'''
self.q = self.opts['q_init']
self.T = self.opts['T_init']
self.t = 0
self.randomize_state()
self.actC = self.N2C()
self.extC_prev = np.zeros(self.num_bindings)
self.ext_prev = self.TP.dot(self.extC_prev)
self.unclamp()
self.clear_input()
if hasattr(self, 'traces'):
del self.traces
def set_state(self, binding_names, vals=1.0):
"""Set state to a particular vector at which the activation values
of the given bindings are set to [vals] (default=1.0)
and the activation values of the other bindings are set to 0."""
idx = self.find_bindings(binding_names)
self.actC = np.zeros(self.num_bindings)
self.actC[idx] = vals
self.act = self.C2N()
def set_init_state(self, mu=0.5, sd=0.2):
"""Set initial state"""
self.actC = np.random.normal(loc=mu, scale=sd, size=self.num_bindings)
self.act = self.C2N()
def randomize_state(self, minact=0, maxact=1):
'''Set the activation state to a random vector
inside a hypercube of [minact, maxact]^num_bindings'''
self.actC = np.random.uniform(minact, maxact, self.num_bindings)
self.act = self.C2N(self.actC)
def run(self, duration, update_T=True, update_q=True, log_trace=True,
trace_list='all', plot=False, tol=None, testvar='ema_speed',
grayscale=False, colorbar=True):
'''Run simulations for a given amount of time [time].'''
self.converged = False
t_max = self.t + duration
self.step = 0
if log_trace:
self.initialize_traces(trace_list)
while self.t < t_max:
self.update(update_T=update_T, update_q=update_q)
if log_trace:
self.update_traces()
if tol is not None:
self.check_convergence(tol=tol, testvar=testvar)
if self.converged:
break
self.rt = self.t
self.extC_prev[:] = self.extC
self.ext_prev[:] = self.TP.dot(self.extC_prev)
if log_trace:
self.finalize_traces()
if log_trace and plot:
actC_trace = self.N2C(self.traces['act'].T).T
times = self.traces['t']
times_new = np.linspace(times[0], times[-1], times.shape[0])
actC_trace_new = []
for b_ind in range(actC_trace.shape[1]):
actC_trace_new.append(
np.interp(times_new, times, actC_trace[:, b_ind]))
actC_trace_new = np.array(actC_trace_new).T
heatmap(
actC_trace_new.T,
xlabel="Time", xtick=False,
ylabel="Bindings", yticklabels=self.binding_names,
grayscale=grayscale, colorbar=colorbar, val_range=[0, 1])
def update(self, update_T=True, update_q=True):
"""Update state, speed, ema_speed (and optionally T, q, dt)"""
self.act_prev[:] = self.act
self.actC_prev[:] = self.actC
self.update_state()
self.update_speed()
if update_T and (self.opts['T_decay_rate'] > 0):
self.update_T()
if update_q:
self.update_q()
def update_state(self):
'''Update state (with noise)'''
grad = self.HGrad()
grad_mag = np.sqrt(grad.dot(grad))
if grad_mag > 0:
self.dt = min(self.opts['max_dt'], self.opts['max_dt'] / grad_mag)
self.dt = max(self.opts['min_dt'], self.dt)
self.t += self.dt
self.act += self.dt * grad
self.add_noise()
if self.clamped:
self.act = self.act_clamped()
self.actC = self.N2C()
def add_noise(self):
'''Add noise to state in neural coordinates.'''
self.act += (np.sqrt(2 * self.T * self.dt) *
np.random.randn(self.num_units))
def update_T(self):
'''Update temperature'''
self.T = (np.exp(-self.opts['T_decay_rate'] * self.dt) *
(self.T - self.opts['T_min']) + self.opts['T_min'])
def update_q(self):
'''Update quantization strength'''
if self.opts['q_policy'] is not None:
self.q = np.interp(
self.t, self.opts['q_policy'][:, 0], self.opts['q_policy'][:, 1])
else:
self.q = max(min(self.q + self.opts['q_rate'] *
self.dt, self.opts['q_max']), 0)
def update_speed(self):
"""Update speed and ema_speed"""
if self.opts['coord'] == 'N':
diff = self.act - self.act_prev
elif self.opts['coord'] == 'C':
diff = self.actC - self.actC_prev
self.speed = linalg.norm(
diff, ord=self.opts['norm_ord']) / abs(self.dt)
if self.ema_speed is None:
self.ema_speed = self.speed
else:
ema_weight = self.opts['ema_factor'] ** abs(self.dt)
self.ema_speed = (ema_weight * self.ema_speed +
(1 - ema_weight) * self.speed)
def check_convergence(self, tol, testvar='ema_speed'):
'''Check if the convergence criterion (distance vs. ema_speed) has been satisfied.'''
if testvar == 'ema_speed':
if self.ema_speed < tol:
self.converged = True
if testvar == 'Q':
if self.Q() > tol:
self.converged = True
def plot_state(self, act=None, actC=None, coord='C',
colorbar=True, disp=True, grayscale=True):
"""Plot the activation state (conceptual coordinate) in a heatmap."""
if (act is None) and (actC is None):
act = self.act
actC = self.actC
elif (act is None) and (actC is not None):
act = self.C2N(actC)
elif (act is not None) and (actC is None):
actC = self.N2C(act)
else:
sys.exit('Error. You must pass either act or actC but not both to the function.')
if coord == 'C':
heatmap(
self.vec2mat(actC), xticklabels=self.role_names,
yticklabels=self.filler_names, grayscale=grayscale,
colorbar=colorbar, disp=disp, val_range=[0, 1])
elif coord == 'N':
act_mat = act.reshape((self.dim_f, self.dim_r), order='F')
yticklabels = ['f' + str(ii) for ii in range(self.dim_f)]
xticklabels = ['r' + str(ii) for ii in range(self.dim_r)]
heatmap(
act_mat, xticklabels=xticklabels, yticklabels=yticklabels,
grayscale=grayscale, colorbar=colorbar, disp=disp)
def plot_trace(self, varname):
"""Plot the trace of a given variable"""
x = self.traces['t']
if varname is 'actC':
y = self.N2C(self.traces[varname[:-1]].T).T
else:
y = self.traces[varname]
plt.plot(x, y)
plt.xlabel('Time', fontsize=16)
plt.ylabel(varname, fontsize=16)
plt.grid(True)
plt.show()
def encode_symbols(num_symbols, coord='dist', dp=0., dim=None):
"""Generate the vector encodings of [num_symbols] symbols assuming a given similarity structure.
Each column vector will represent a unique symbol.
Usage:
>>> gsc.encode_symbols(2)
>>> gsc.encode_symbols(3, coord='dist', dp=0.3, dim=5)
: num_symbols : int, number of symbols to encode
: coord : string, 'dist' (distributed representation, default) or 'local' (local representation)
: dp : float (0 [default] <= dp <= 1) or 2D-numpy array of pairwise similarity (dot product)
: dim : int, number of dimensions to encode a symbol. must not be smaller than [num_symbols]
:
: [dp] and [dim] values are ignored if coord is set to 'local'.
"""
if coord == 'local':
sym_mat = np.eye(num_symbols)
else:
if dim is None:
dim = num_symbols
else:
if dim < num_symbols:
sys.exit("The [dim] value must be same as or greater than the [num_symbols] value.")
if isinstance(dp, numbers.Number):
dp = (dp * np.ones((num_symbols, num_symbols)) +
(1 - dp) * np.eye(num_symbols, num_symbols))
sym_mat = dot_products(num_symbols, dim, dp)
return sym_mat
def dot_products(num_symbols, dim, dp_mat, max_iter=100000):
"""Generate a 2D numpy array of random numbers such that the pairwise dot
products of column vectors are close to the numbers specified in [dp_mat].
Don Matthias wrote the original script in MATLAB for the LDNet program.
He explains how this program works as follows:
Given square matrix dpMatrix of dimension N-by-N, find N
dim-dimensional unit vectors whose pairwise dot products match
dpMatrix. Results are returned in the columns of M. itns is the
number of iterations of search required, and may be ignored.
Algorithm: Find a matrix M such that M'*M = dpMatrix. This is done
via gradient descent on a cost function that is the square of the
frobenius norm of (M'*M-dpMatrix).
NOTE: It has trouble finding more than about 16 vectors, possibly for
dumb numerical reasons (like stepsize and tolerance), which might be
fixable if necessary.
"""
if not (dp_mat.T == dp_mat).all():
sys.exit('dot_products: dp_mat must be symmetric')
if (np.diag(dp_mat) != 1).any():
sys.exit('dot_products: dp_mat must have all ones on the main diagonal')
sym_mat = np.random.uniform(
size=dim * num_symbols).reshape(dim, num_symbols, order='F')
min_step = .1
tol = 1e-6
converged = False
for iter_num in range(1, max_iter + 1):
inc = sym_mat.dot(sym_mat.T.dot(sym_mat) - dp_mat)
step = min(min_step, .01 / abs(inc).max())
sym_mat = sym_mat - step * inc
max_diff = abs(sym_mat.T.dot(sym_mat) - dp_mat).max()
if max_diff <= tol:
converged = True
break
if not converged:
print("Didn't converge after %d iterations" % max_iter)
return sym_mat
def heatmap(data, xlabel=None, ylabel=None, xticklabels=None, yticklabels=None,
grayscale=False, colorbar=True, rotate_xticklabels=False,
xtick=True, ytick=True, disp=True, val_range=None):
if grayscale:
cmap = plt.cm.get_cmap("gray_r")
else:
cmap = plt.cm.get_cmap("Reds")
if val_range is not None:
plt.imshow(data, cmap=cmap, vmin=val_range[0], vmax=val_range[1],
interpolation="nearest", aspect='auto')
else:
plt.imshow(data, cmap=cmap, interpolation="nearest", aspect='auto')
if xlabel is not None:
plt.xlabel(xlabel, fontsize=16)
if ylabel is not None:
plt.ylabel(ylabel, fontsize=16)
if xticklabels is not None:
if rotate_xticklabels:
plt.xticks(
np.arange(len(xticklabels)), xticklabels,
rotation='vertical')
else:
plt.xticks(np.arange(len(xticklabels)), xticklabels)
if yticklabels is not None:
plt.yticks(np.arange(len(yticklabels)), yticklabels)
if xtick is False:
plt.tick_params(
axis='x',
which='both',
bottom='off',
top='off',
labelbottom='off')
if ytick is False:
plt.tick_params(
axis='y',
which='both',
left='off',
right='off',
labelleft='off')
if colorbar:
plt.colorbar()
if disp:
plt.show()
def plot_TP(vec1, vec2, figsize=None):
'''Compute the outer product of two vectors and present it in a diagram.'''
nrow = vec1.shape[0]
ncol = vec2.shape[0]
radius = 0.4
arr = np.zeros((nrow + 1, ncol + 1))
arr[1:, 1:] = np.outer(vec1, vec2)
arr[0, 1:] = vec2
arr[1:, 0] = vec1
if figsize is None:
fig, ax = plt.subplots()
else:
fig, ax = plt.subplots(figsize=figsize)
for ii in range(nrow + 1):
for jj in range(ncol + 1):
if (ii == 0) and (jj == 0):
continue
if (ii == 0) or (jj == 0):
alpha = 1
else:
alpha = 1
if arr[ii, jj] >= 0:
curr_unit = plt.Circle(
(jj, -ii), radius,
color=plt.cm.gray(1 - abs(arr[ii, jj])),
alpha=alpha)
ax.add_artist(curr_unit)
curr_unit = plt.Circle(
(jj, -ii), radius,
color='k', fill=False)
ax.add_artist(curr_unit)
else:
curr_unit = plt.Circle(
(jj, -ii), radius,
color='k', fill=False)
ax.add_artist(curr_unit)
curr_unit = plt.Circle(
(jj, -ii), radius - 0.1,
color=plt.cm.gray(1 - abs(arr[ii, jj])),
alpha=alpha)
ax.add_artist(curr_unit)
curr_unit = plt.Circle(
(jj, -ii), radius - 0.1,
color='k', fill=False)
ax.add_artist(curr_unit)
ax.axis([
0 - radius - 0.6, ncol + radius + 0.6,
- nrow - radius - 0.6, 0 + radius + 0.6])
ax.set_aspect('equal', adjustable='box')
ax.axis('off')
def b_ind(f, r, net):
'''Get a binding index.'''
return f + r * net.num_fillers
def u_ind(phi, rho, net):
'''Get a unit index.'''
return phi + rho * net.dim_f
def w(f, r, phi, rho, net):
return net.TPinv[b_ind(f, r, net), u_ind(phi, rho, net)]
def get_a(n, net, f, r):
'''Check the activation value of a f/r binding.'''
act = 0
for phi in range(net.dim_f):
for rho in range(net.dim_r):
act += w(f, r, phi, rho, net) * n[u_ind(phi, rho, net)]
return act
def n2a(n, net, f=None, r=None):
if (f is None) and (r is None):
avec = np.zeros(net.num_bindings)
for f in range(net.num_fillers):
for r in range(net.num_roles):
avec[b_ind(f, r, net)] = get_a(n, net, f, r)
return avec
elif (f is None) and (r is not None):
avec = np.zeros(net.num_fillers)
for f in range(net.num_fillers):
avec[f] = get_a(n, net, f, r)
return avec
elif (f is not None) and (r is None):
avec = np.zeros(net.num_roles)
for r in range(net.num_roles):
avec[r] = get_a(n, net, f, r)
return avec
else:
return get_a(n, net, f, r)
def Q0E(net, n):
q0 = 0.0
for f in range(net.num_fillers):
for r in range(net.num_roles):
q0 += n2a(n, net, f=f, r=r)**2 * (1 - n2a(n, net, f=f, r=r))**2
return -q0
def Q0GradE(net, n):
q0grad = np.zeros(net.num_units)
for phi in range(net.dim_f):
for rho in range(net.dim_r):
q0grad[u_ind(phi, rho, net)] = 0.0
for f in range(net.num_fillers):
for r in range(net.num_roles):
a_fr = n2a(n, net, f, r)
g_fr = 2 * a_fr * (1 - a_fr) * (1 - 2 * a_fr)
q0grad[u_ind(phi, rho, net)] += w(
f, r, phi, rho, net) * g_fr
return -q0grad
def Q1E(net, n):
q1 = 0.0
for r in range(net.num_roles):
q1 += (np.sum(n2a(n, net, r=r)**2) - 1)**2
return -np.sum((np.sum(net.vec2mat(n2a(n, net))**2, axis=0) - 1)**2)
def Q1GradE(net, n):
q1grad = np.zeros(net.num_units)
for phi in range(net.dim_f):
for rho in range(net.dim_r):
unit_grad = 0.0
for r in range(net.num_roles):
var1 = np.sum(n2a(n, net, r=r)**2) - 1
var2 = 0.0
for f in range(net.num_fillers):
var2 += n2a(n, net, f, r) * w(f, r, phi, rho, net)
unit_grad += 4 * var1 * var2
q1grad[u_ind(phi, rho, net)] = unit_grad
return -q1grad
def Q0GradV(net, n):
a = net.N2C(n)
g = 2 * a * (1 - a) * (1 - 2 * a)
gmat = np.tile(g, (net.num_units, 1)).T
q0grad = np.sum(net.TPinv * gmat, axis=0)
return -q0grad
def Q1GradV(net, n):
a = net.N2C(n)
q1grad = 0.0
for r_ind, rr in enumerate(net.role_names):
curr_binding_ind = net.find_roles(rr)
amat = np.tile(a[curr_binding_ind], (net.num_units, 1)).T
term2 = np.sum(net.TPinv[curr_binding_ind, :] * amat, axis=0)
term1 = np.sum(a[curr_binding_ind] ** 2) - 1
q1grad += term1 * term2
q1grad = 4 * q1grad
return -q1grad