-
Notifications
You must be signed in to change notification settings - Fork 0
/
rbm.py
148 lines (125 loc) · 5.21 KB
/
rbm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Boltzmann Machines
# Importing the libraries
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
import torch.utils.data
from torch.autograd import Variable
# Importing the dataset
movies = pd.read_csv('ml-1m/movies.dat', sep = '::', header = None, engine = 'python', encoding = 'latin-1')
users = pd.read_csv('ml-1m/users.dat', sep = '::', header = None, engine = 'python', encoding = 'latin-1')
ratings = pd.read_csv('ml-1m/ratings.dat', sep = '::', header = None, engine = 'python', encoding = 'latin-1')
# Preparing the training set and the test set
training_set = pd.read_csv('ml-100k/u1.base', delimiter = '\t')
training_set = np.array(training_set, dtype = 'int')
test_set = pd.read_csv('ml-100k/u1.test', delimiter = '\t')
test_set = np.array(test_set, dtype = 'int')
# Getting the number of users and movies
nb_users = int(max(max(training_set[:,0]), max(test_set[:,0])))
nb_movies = int(max(max(training_set[:,1]), max(test_set[:,1])))
# Converting the data into an array with users in lines and movies in columns
# we going to create list in list because PyTorch works only with it
def convert(data):
new_data = []
for id_users in range(1, nb_users + 1):
id_movies = data[:,1][data[:,0] == id_users]
id_ratings = data[:,2][data[:,0] == id_users]
ratings = np.zeros(nb_movies)
ratings[id_movies - 1] = id_ratings
new_data.append(list(ratings))
return new_data
training_set = convert(training_set)
test_set = convert(test_set)
# Converting the data into Torch tensors
training_set = torch.FloatTensor(training_set)
test_set = torch.FloatTensor(test_set)
# Converting the ratings into binary ratings 1 (Liked) or 0 (Not Liked)
training_set[training_set == 0] = -1
training_set[training_set == 1] = 0
training_set[training_set == 2] = 0
training_set[training_set >= 3] = 1
test_set[test_set == 0] = -1
test_set[test_set == 1] = 0
test_set[test_set == 2] = 0
test_set[test_set >= 3] = 1
# Creating the architecture of the Neural Network
class RBM():
# nv - number of visible nodes
# nh - number of hidden nodes
# W - wigths
# a - bias of hidden nodes given visible nodes
# b - bias of visible nodes given hidden nodes
def __init__(self, nv, nh):
self.W = torch.randn(nh, nv)
self.a = torch.randn(1, nh)
self.b = torch.randn(1, nv)
# sample the probabilites of hidden nodes given visible nodes
# x - visable neuron v with given probability p_h_given_v
# mm - product of two Torch tensors
# t() - transpose
# self.a.expand_as(wx) - we make sure that the bias are applied to each line of the minibatch
# p_h_given_v - probability that hidden node is activated to given visable node (that is we will find some pattern)
# bernoulli - our outcame is binary, a user likes a movie or not. bernoulli gives a vector wicth consits from 1s and 0s. 1 coresponds that a neuron was activated and 0 wasn't
# bernoulli:
# 1 with probability p
# 0 with probability 1-p
def sample_h(self, x):
wx = torch.mm(x, self.W.t())
activation = wx + self.a.expand_as(wx)
p_h_given_v = torch.sigmoid(activation)
return p_h_given_v, torch.bernoulli(p_h_given_v)
# sample the probabilites of visible nodes given hidden nodes
# y - visable neuron
def sample_v(self, y):
wy = torch.mm(y, self.W)
activation = wy + self.b.expand_as(wy)
p_v_given_h = torch.sigmoid(activation)
return p_v_given_h, torch.bernoulli(p_v_given_h)
# Contrastive Divergence
# v0 - rating of all movies of one user
# vk - visible nodes obtained after case samplings
# ph0 probabilities that at the first iteration the hidden node = 1 with givven v0
# phk probabilities that at the first iteration the hidden node = 1 with givven vk (after k-samling)
def train(self, v0, vk, ph0, phk):
self.W += torch.mm(v0.t(), ph0) - torch.mm(vk.t(), phk)
self.b += torch.sum((v0 - vk), 0)
self.a += torch.sum((ph0 - phk), 0)
nv = len(training_set[0])
nh = 100
batch_size = 100
rbm = RBM(nv, nh)
# Training the RBM
nb_epoch = 10
for epoch in range(1, nb_epoch + 1):
train_loss = 0
s = 0.
for id_user in range(0, nb_users - batch_size, batch_size):
vk = training_set[id_user:id_user+batch_size]
v0 = training_set[id_user:id_user+batch_size]
ph0,_ = rbm.sample_h(v0)
# loop for Contrastive Divergence
for k in range(10):
_,hk = rbm.sample_h(vk)
_,vk = rbm.sample_v(hk)
# we freeze nodes with rating = -1
vk[v0<0] = v0[v0<0]
phk,_ = rbm.sample_h(vk)
rbm.train(v0, vk, ph0, phk)
train_loss += torch.mean(torch.abs(v0[v0>=0] - vk[v0>=0]))
s += 1.
print('epoch: '+str(epoch)+' loss: '+str(train_loss/s))
# Testing the RBM
test_loss = 0
s = 0.
for id_user in range(nb_users):
v = training_set[id_user:id_user+1]
vt = test_set[id_user:id_user+1]
if len(vt[vt>=0]) > 0:
_,h = rbm.sample_h(v)
_,v = rbm.sample_v(h)
test_loss += torch.mean(torch.abs(vt[vt>=0] - v[vt>=0]))
s += 1.
print('test loss: '+str(test_loss/s))