DBN nodesΒΆ

Download dbn_nodes.py.

"""
This file contains the original DBN node from Pietro.
"""

import mdp
from mdp import numx
from mdp.utils import mult

random = mdp.numx_rand.random
exp = mdp.numx.exp

# TODO: DBNLayer with labels
# TODO: DBNLayer with Gaussian visible variables
# TODO: remove self._rbm after greedy phase
# TODO: some functionality of RBMNode is duplicated with small variation;
#    should this be a subclass of RBMNode?

class DBNLayerNode(mdp.Node):

    def __init__(self, hidden_dim, visible_dim = None, dtype = None):
        super(DBNLayerNode, self).__init__(input_dim = visible_dim,
                                           output_dim = hidden_dim,
                                           dtype = dtype)
        # delegates computations to an RBM during the greedy phase
        self._rbm = mdp.nodes.RBMNode(hidden_dim, visible_dim, dtype)
        self._updown_initialized = False

    def sample_h(self, v):
        """Sample the hidden variables given observations v.

        Returns a tuple (prob_h, h), where prob_h[n,i] is the
        probability that variable 'i' is one given the observations
        v[n,:], and h[n,i] is a sample from the posterior probability."""
        self._pre_execution_checks(v)
        return self._sample_h(v)

    def sample_v(self, h):
        """Sample the observed variables given hidden variable state h.

        Returns a tuple (prob_v, v), where prob_v[n,i] is the
        probability that variable 'i' is one given the hidden variables
        h[n,:], and v[n,i] is a sample from that conditional probability."""
        self._pre_inversion_checks(h)
        return self._sample_v(h)

    def _sample_h(self, v):
        # P(h=1|v,W,b)
        probs = 1./(1. + exp(-self.bh - mult(v, self.w_rec)))
        h = (probs > random(probs.shape)).astype('d')
        return probs, h

    def _sample_v(self, h):
        # P(v=1|h,W,b)
        probs = 1./(1. + exp(-self.bv - mult(h, self.w_gen.T)))
        v = (probs > random(probs.shape)).astype('d')
        return probs, v
    
    def _execute(self, v, return_probs=True):
        """If 'return_probs' is True, returns the probability of the
        hidden variables h[n,i] being 1 given the observations v[n,:].
        If 'return_probs' is False, return a sample from that probability.
        """
        probs, h = self._sample_h(v)
        if return_probs:
            return probs
        else:
            return h

    def _inverse(self, h):
        probs, v = self._sample_v(h)
        return v

    # greedy phase training: delegate to RBM
    def _train(self, x, epsilon=0.1, decay=0., momentum=0.):
        self._rbm.train(x, epsilon=epsilon, decay=decay, momentum=momentum)

    def _stop_training(self):
        self._rbm.stop_training()
        self._init_updown()
    # /greedy phase training

    def _init_updown(self):
        """
        The greedy phase of learning is delegated to an RBM.
        Here the learned weights are decoupled into generative and
        recognition weights, and other parameters are initialized.
        """
        self._updown_initialized = True
        # recognition and generative weights
        self.w_rec = self._rbm.w.copy()
        self.w_gen = self._rbm.w.copy()
        # biases
        self.bv = self._rbm.bv.copy()
        self.bh = self._rbm.bh.copy()
        # changes in parameters during learning
        # used to add momentum
        self.dw_wake = 0.
        self.dw_sleep = 0.
        self.dbv = self.dbh = 0.

    # this corresponds to the wake phse
    def _up_pass(self, v, epsilon=0.1, decay=0., momentum=0.):
        """
        Returns (sample from hidden layer, norm of the weight change)
        """
        self._pre_execution_checks(v)

        # sample from hidden layer
        ph, h = self._sample_h(v)
        # reconstruct input
        pv1, v1 = self.sample_v(h)
        
        # adapt generative weights
        delta = mult((v - pv1).T, h)/v.shape[0]
        self.dw_wake = (momentum*self.dw_wake
                        + epsilon*(delta - decay*self.w_gen))
        self.w_gen += self.dw_wake

        # adapt biases
        delta = (v - pv1).mean(axis=0)
        self.dbv = momentum*self.dbv + epsilon*delta
        self.bv += self.dbv

        return h, ph, mdp.utils.norm2(self.dbv)

    # this corresponds to the sleep phase
    def _down_pass(self, h, top_updates=0, epsilon=0.1, decay=0., momentum=0.):
        """
        top_updates -- set >0 for top node, so that it ends up sampling
                       from the prior
        """
        # TODO: check input

        pv, v = self._sample_v(h)
        for _ in range(top_updates):
            ph, h = self._sample_h(v)
            pv, v = self._sample_v(h)
            
        # reconstruct hidden state
        ph1, h1 = self._sample_h(v)
        
        # adapt generative weights
        delta = mult(v.T, (h - ph1))/v.shape[0]
        self.dw_sleep = (momentum*self.dw_sleep
                         + epsilon*(delta - decay*self.w_rec))
        self.w_rec += self.dw_sleep

        # adapt biases
        delta = (h - ph1).mean(axis=0)
        self.dbh = momentum*self.dbh + epsilon*delta
        self.bh += self.dbh
        
        return v, pv, mdp.utils.norm2(self.dbh)