""" This file contains the original DBN node from Pietro. """ import mdp from mdp import numx from mdp.utils import mult random = mdp.numx_rand.random exp = mdp.numx.exp # TODO: DBNLayer with labels # TODO: DBNLayer with Gaussian visible variables # TODO: remove self._rbm after greedy phase # TODO: some functionality of RBMNode is duplicated with small variation; # should this be a subclass of RBMNode? class DBNLayerNode(mdp.Node): def __init__(self, hidden_dim, visible_dim = None, dtype = None): super(DBNLayerNode, self).__init__(input_dim = visible_dim, output_dim = hidden_dim, dtype = dtype) # delegates computations to an RBM during the greedy phase self._rbm = mdp.nodes.RBMNode(hidden_dim, visible_dim, dtype) self._updown_initialized = False def sample_h(self, v): """Sample the hidden variables given observations v. Returns a tuple (prob_h, h), where prob_h[n,i] is the probability that variable 'i' is one given the observations v[n,:], and h[n,i] is a sample from the posterior probability.""" self._pre_execution_checks(v) return self._sample_h(v) def sample_v(self, h): """Sample the observed variables given hidden variable state h. Returns a tuple (prob_v, v), where prob_v[n,i] is the probability that variable 'i' is one given the hidden variables h[n,:], and v[n,i] is a sample from that conditional probability.""" self._pre_inversion_checks(h) return self._sample_v(h) def _sample_h(self, v): # P(h=1|v,W,b) probs = 1./(1. + exp(-self.bh - mult(v, self.w_rec))) h = (probs > random(probs.shape)).astype('d') return probs, h def _sample_v(self, h): # P(v=1|h,W,b) probs = 1./(1. + exp(-self.bv - mult(h, self.w_gen.T))) v = (probs > random(probs.shape)).astype('d') return probs, v def _execute(self, v, return_probs=True): """If 'return_probs' is True, returns the probability of the hidden variables h[n,i] being 1 given the observations v[n,:]. If 'return_probs' is False, return a sample from that probability. """ probs, h = self._sample_h(v) if return_probs: return probs else: return h def _inverse(self, h): probs, v = self._sample_v(h) return v # greedy phase training: delegate to RBM def _train(self, x, epsilon=0.1, decay=0., momentum=0.): self._rbm.train(x, epsilon=epsilon, decay=decay, momentum=momentum) def _stop_training(self): self._rbm.stop_training() self._init_updown() # /greedy phase training def _init_updown(self): """ The greedy phase of learning is delegated to an RBM. Here the learned weights are decoupled into generative and recognition weights, and other parameters are initialized. """ self._updown_initialized = True # recognition and generative weights self.w_rec = self._rbm.w.copy() self.w_gen = self._rbm.w.copy() # biases self.bv = self._rbm.bv.copy() self.bh = self._rbm.bh.copy() # changes in parameters during learning # used to add momentum self.dw_wake = 0. self.dw_sleep = 0. self.dbv = self.dbh = 0. # this corresponds to the wake phse def _up_pass(self, v, epsilon=0.1, decay=0., momentum=0.): """ Returns (sample from hidden layer, norm of the weight change) """ self._pre_execution_checks(v) # sample from hidden layer ph, h = self._sample_h(v) # reconstruct input pv1, v1 = self.sample_v(h) # adapt generative weights delta = mult((v - pv1).T, h)/v.shape[0] self.dw_wake = (momentum*self.dw_wake + epsilon*(delta - decay*self.w_gen)) self.w_gen += self.dw_wake # adapt biases delta = (v - pv1).mean(axis=0) self.dbv = momentum*self.dbv + epsilon*delta self.bv += self.dbv return h, ph, mdp.utils.norm2(self.dbv) # this corresponds to the sleep phase def _down_pass(self, h, top_updates=0, epsilon=0.1, decay=0., momentum=0.): """ top_updates -- set >0 for top node, so that it ends up sampling from the prior """ # TODO: check input pv, v = self._sample_v(h) for _ in range(top_updates): ph, h = self._sample_h(v) pv, v = self._sample_v(h) # reconstruct hidden state ph1, h1 = self._sample_h(v) # adapt generative weights delta = mult(v.T, (h - ph1))/v.shape[0] self.dw_sleep = (momentum*self.dw_sleep + epsilon*(delta - decay*self.w_rec)) self.w_rec += self.dw_sleep # adapt biases delta = (h - ph1).mean(axis=0) self.dbh = momentum*self.dbh + epsilon*delta self.bh += self.dbh return v, pv, mdp.utils.norm2(self.dbh)