Code snippets for page Gradient Descent and the Newton MethodΒΆ

Download gradnewton.py. Browse the code snippet index.

# -*- coding: utf-8 -*-
# Generated by codesnippet sphinx extension on 2016-03-08

import mdp
import numpy as np
np.random.seed(0)
import bimdp
class NewtonNode(bimdp.nodes.IdentityBiNode):
    """Node to implement gradient descent with the Newton method.

    :param sender_id: id of the SenderBiNode that is at the front of
      the flow, or at the point where the x value should be optimized.
    """
    def __init__(self, sender_id, **kwargs):
        self.sender_id = sender_id
        super(NewtonNode, self).__init__(**kwargs)

    @bimdp.binode_coroutine(["grad", "msg_x", "msg"])
    def _newton(self, y_goal, n_iterations, x_start, msg):
        """Try to reach the given y value with gradient descent.

        The Newton method is used to calculate the next point.
        """
        # can't use function decorator, since this is a coroutine
        with mdp.extension("gradient"):
            # get the y value for the output
            msg = {self.node_id + "->method": "newton", "method": "gradient"}
            y, grad, x, _ = yield x_start, msg.copy(), self.sender_id
            for _ in xrange(n_iterations):
                # use Newton's method to get the new data point
                error = np.sum((y - y_goal) ** 2, axis=1)
                error_grad = np.sum(2 * (y - y_goal)[:,:,np.newaxis] * grad,
                                    axis=1)
                err_grad_norm = np.sqrt(np.sum(error_grad**2, axis=1))
                unit_error_grad = error_grad / err_grad_norm[:,np.newaxis]
                # x_{n+1} = x_n - f(x_n) / f'(x_n)
                x = x - (error / err_grad_norm)[:,np.newaxis] * unit_error_grad
                y, grad, x, _ = yield x, msg.copy(), self.sender_id
            raise StopIteration(x, None, "exit")

n_iterations = 3
show_inspection = True

sfa_node = bimdp.nodes.SFA2BiNode(input_dim=4*4, output_dim=5)
switchboard = bimdp.hinet.Rectangular2dBiSwitchboard(
                                in_channels_xy=8,
                                field_channels_xy=4,
                                field_spacing_xy=2)
flownode = bimdp.hinet.BiFlowNode(bimdp.BiFlow([sfa_node]))
sfa_layer = bimdp.hinet.CloneBiLayer(flownode,
                                     switchboard.output_channels)
flow = bimdp.BiFlow([switchboard, sfa_layer])
train_data = [np.random.random((10, switchboard.input_dim))
              for _ in range(3)]
flow.train([None, train_data])

sender_node = bimdp.nodes.SenderBiNode(node_id="sender", recipient_id="newton")
newton_node = NewtonNode(sender_id="sender", input_dim=sfa_layer.output_dim,
                         node_id="newton")
flow = sender_node + flow + newton_node

x_goal = np.random.random((2, switchboard.input_dim))
goal_y, msg = flow.execute(x_goal)

x_start = np.random.random((2, switchboard.input_dim))
y_start, _ = flow.execute(x_start)

msg = {"method": "newton", "n_iterations": n_iterations, "x_start": x_start}
if show_inspection:
    _, (x, msg) = bimdp.show_execution(flow, x=goal_y, msg=msg, target="newton")
else:
    x, msg = flow.execute(goal_y, msg, "newton")

y, _ = flow.execute(x)
print ("errors before optimization: %s" %
       np.sum((y_start - goal_y)**2, axis=1))
# Expected:
## errors before optimization: [ 196.49202899  241.31524993]
print ("errors after optimization  : %s" %
       np.sum((y - goal_y)**2, axis=1))
# Expected:
## errors after optimization  : [ 39.01372025  34.84160123]