옛날에 해봤던 boosted stumps 에 이어, 얼마 전에 PyBrain 발견한 김에 한번 해봐야겠다 하고 적용했다.
같은 입력에 대해 은닉층 노드 4개 놓은 ANN 과 boosted stumps 의 비교. 은닉층은 2개까지 줄여봤는데 별 차이는 없었다. 물론 1개로 줄이면 선형회귀가 되니 흠좀무.
이런 구조를 갖는 문제에서는 역시 ANN 이 훨~씬~ 훨~씬~ 나은듯. 근데 내가 지금 풀고자 하는 문제는 사실 이런게 아닌데 이런걸로 직관 얻으려고 해봐야... ㅡ,.ㅡ 그래도 재미는 있구나. 흠
아, 실제로는 PyBrain 튜토리얼 따라서 XOR classification 해보려고 했는데 그것도 안되는 바람에 (!) ffnet 이라는 라이브러리로 전환. PyBrain 보다 훨씬 나은듯. 학습하고 나면 포트란 pre-written 소스코드로 익스포트도 해준다. 헐퀴
소스코드
lang:py
# a test implementation of boosted decision stumps: jongman@gmail.com
# run in ipython: %run -i play.py
from pylab import *
from itertools import izip
import sys
x, y, label = array([]), array([]), array([], dtype=int16)
class Functor:
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
def __call__(self, *args):
return self.func(*(self.args + args), **self.kwargs)
def __repr__(self):
fn = self.func.func_name if type(self.func) == types.FunctionType else str(self.func)
return (fn + "("
+ ", ".join("%s" % str(v) for v in self.args)
+ ", ".join(["%s=%s" % (str(k),str(v))
for k, v in self.kwargs.iteritems()]) + ")")
# minimize sum of weights of misclassified samples
def trainDecisionStump(x, y, label, w = None):
if w == None: w = ones(len(x)) / len(x)
X = [x, y]
y = label
bestFeature, bestValue, bestMultiply, minWrong = 0, 0, 1, sum(w)
if abs(1.0 - sum(w)) > 1e-7:
print w
assert abs(1.0 - sum(w)) < 1e-7
for fidx in range(len(X)):
feature = X[fidx]
order = argsort(feature)
# if feature < threshold, predict -1
incorrect = sum(w[where(label == -1)])
for i in xrange(len(feature)):
if i > 0 and feature[order[i]] != feature[order[i-1]]:
mid = (feature[order[i]] + feature[order[i-1]]) * 0.5
# assume: feature < feature[order[i]], predict -1
wrong, mult = incorrect, 1
# if we are more than 50% wrong, reverse
if wrong > 0.5:
wrong = 1.0 - wrong
mult = -1
if wrong < minWrong:
minWrong = wrong
bestFeature = fidx
bestValue = mid
bestMultiply = mult
if label[order[i]] == -1:
incorrect -= w[order[i]]
else:
incorrect += w[order[i]]
return (bestFeature, bestValue, bestMultiply)
def predictDecisionStump(w, x, y):
features = [x,y]
return -1 if features[w[0]] * w[2] < w[1] * w[2] else 1
def trainAdaBoost(x, y, label, train, predict):
n = len(x)
if n == 0: return
ITERATIONS = 20
predictors = []
alphas = []
w = ones(n) / n
for t in xrange(ITERATIONS):
#print "iter %d ===, len %d" % (t, n)
predictors.append(train(x, y, label, w))
if predictors[-1] == None: return None
predicted = array([predict(predictors[-1], xx, yy) for xx, yy in izip(x, y)])
wrong = array([1 if p != l else 0 for p, l in izip(predicted, label)])
error = dot(wrong, w)
alphas.append(log((1-error)/error))
#print "iteration #%d: error %.4lf alpha %.4lf" % (t, error, alphas[-1])
if error <= 1e-8:
alphas = [1.0]
predictors = [predictors[-1]]
break
if t > 0 and error >= 0.5:
alphas.pop()
predictors.pop()
break
#print "incorrects are boosted by %.2lf" % exp(alphas[-1])
w = w * exp(alphas[-1] * wrong)
w = w / sum(w)
return (predict, predictors, alphas)
def predictAdaBoost(rep, x, y, quantize = True):
predict, predictors, alphas = rep
ret = 0.0
for predictor, alpha in izip(predictors, alphas):
ret += alpha * predict(predictor, x, y)
if quantize: return 1 if ret >= 0 else -1
return ret
def visualizeGrid(func, rep):
global c
RES = 20
unit = 100. / RES
x = arange(0,101,unit)
y = arange(0,101,unit)
X, Y = meshgrid(x, y)
c = zeros((len(y)-1, len(x)-1))
for yi in xrange(RES):
yy = (yi + 0.5) * unit
for xi in xrange(RES):
xx = (xi + 0.5) * unit
c[yi][xi] = func(rep, xx, yy, False)
#predictAdaBoost(rep, xx, yy, False)
#print "visualize (%d,%d) corresponds to (%.2lf, %.2lf) => %.2lf" % (yi,xi,yy,xx,c[yi][xi])
pcolor(X, Y, c, vmin=-1, vmax=1, alpha=0.8, cmap=cm.get_cmap("RdBu_r"))
if "cb" not in globals():
global cb
cb = colorbar()
cb.draw_all()
visualizeAdaBoost = Functor(visualizeGrid, predictAdaBoost)
def trainNN(x, y, label):
from ffnet import ffnet, mlgraph
input = zip(x, y)
target = [[l] for l in label]
conec = mlgraph((2, 4, 1))
net = ffnet(conec)
net.train_genetic(input, target, individuals=20, generations=500)
net.train_tnc(input, target, maxfun=1000, messages=1)
return net
def predictNN(rep, x, y, quantize = True):
ret = rep((x, y))
if quantize: return -1 if ret <= 0 else 1
return ret
visualizeNN = Functor(visualizeGrid, predictNN)
def trainLinear(x, y, label, w = None):
if w == None: w = ones(len(x))
X = vstack((x, y, ones(len(x)))).T
y = label
try:
ret = dot(inv(dot(X.T*w, X)), dot(X.T*w, y))
except:
# perhaps singular matrix
return None
return ret
def predictLinear(w, x, y):
return 1 if dot(array([x, y, 1]), w) >= 0 else -1
def visualizeLinear(rep, width=2):
a, b, c = rep
# horizontal line
if abs(a) < 1e-8:
plot([0, 100], [-c/b, -c/b], "k:", linewidth=width)
# vertical line
elif abs(b) < 1e-8:
plot([-c/a, -c/a], [0, 100], "k:", linewidth=width)
else:
x = arange(0,101,100)
plot(x, (-c - a*x) / b, "k:", linewidth=width)
# use linear regression
#train, predict, visualize = trainLinear, predictLinear, visualizeLinear
# use decision stump
#train, predict, visualize = trainDecisionStump, predictDecisionStump, None
# use adaboost with decision stump
#train, predict, visualize = Functor(trainAdaBoost, train=trainDecisionStump, predict=predictDecisionStump), predictAdaBoost, visualizeAdaBoost
# use adaboost with linear regression
#train, predict, visualize = Functor(trainAdaBoost, train=trainLinear, predict=predictLinear), predictAdaBoost, lambda x: visualizeAdaBoost(x) or map(lambda p: visualizeLinear(p[0], p[1] * 5), zip(x[1], x[2]))
# use neural network
train, predict, visualize = trainNN, predictNN, visualizeNN
def onclick(event):
global x, y, label
x = append(x, event.xdata)
y = append(y, event.ydata)
label = append(label, 1 if event.button == 1 else -1)
show()
def save():
return (x, y, label)
def load(rep):
global x, y, label
x, y, label = rep
show()
def show():
global pred
grid(True)
xlim((0, 100))
ylim((0, 100))
fig.axes[0].collections = []
fig.axes[0].lines = []
if len(label) > 0:
pred = train(x, y, label)
if pred != None:
predicted = array([predict(pred, xx, yy) for xx, yy in izip(x, y)])
if visualize:
visualize(pred)
else:
predicted = label
print "label", label
print "predicted", predicted
wrong = predicted != label
# remove what we plotted before
for l, color in zip([-1,1], ["blue", "red"]):
for c in [True,False]:
idx = intersect1d(where(wrong != c)[0], where(label == l)[0])
if len(idx) == 0: continue
if c:
scatter(x[idx], y[idx], s=80, marker="o", facecolor=color, edgecolor="w", linewidth=2)
else:
scatter(x[idx], y[idx], s=80, marker="x", edgecolor=color, linewidth=2)
sys.stdout.flush()
xlim((0, 100))
ylim((0, 100))
pylab.show()
if __name__ == "__main__":
global fig
close()
if "cb" in globals():
del cb
fig = figure()
cid = fig.canvas.mpl_connect('button_press_event', onclick)
show()
점점~ 길어지고~ 있구나..


