(update) weighted linear regression + boosting 추가
2차원 평면에 두 종류의 점들을 늘어놓고 (여기서는 빨간점과 파란점) 이들의 좌표를 자질로 해서 점의 종류를 판별하는 문제는 여러 기계 분류 알고리즘을 설명하기 위해 자주 쓰인다. 오늘 Decision Stump 의 곱을 부스팅 하는 것에 관한 논문을 읽다가 심심한데 한번 해볼까 하고 저녁 내내 (ㅠ.ㅠ 어느덧 열한시...) 구현하였다. 일단은 가장 일반적인 additive AdaBoost 버전으로. :-)
그림은 XOR 에 해당하는 입력을 주고, 학습시킨 뒤 다시 분류 시킨 것. 동그라미들과 엑스표들은 입력 (동그라미는 제대로 분류된 것, X 는 잘못 분류된 것) 배경 색은 해당 위치에 대한 predictor 의 답을 보여준다. 물론 오버피팅 쩔긴 하지만 이 정도면 놀랍도록 잘 되는 축인 듯. :) Iteration 수는 20회.
처음엔 저번에 산 Machine Learning: An Algorithmic Perspective 를 보고 구현했는데, 설명은 참 쉽게 잘 되어 있지만 algorithmic perspective 라서 그런지 증명과 수학, rationale 얘기는 정말 하나도 없구나... -_- 그냥 아무 생각 없이 뭔지 알고 써먹기만 할 때 유용할 것 같다. 회사에서는 뽀스 아저씨의 Elements of Statistical Learning 을 빌려 보고 있는데 이거 괜찮다. 좀 마이 어렵지만... 두 권을 병행해서 보니 좀 이해가 잘 된다. (해수 참고하셈;;;)
지금까지 한번도 안 써본 matplotlib 의 인터랙티브 기능을 이용해 구현했는데 정말 편했다. 클릭할 때마다 점을 추가하고 자동 학습 + 보여주는 것을 정말 간단하게 구현할 수 있었다. 이벤트 드리븐으로 짤 수 있는데다, ipython 세션이 항상 떠 있으니 디버거가 항상 같이 떠 있는 셈. 처음에는 컬러맵 (MPL 에서 pcolor 함수) 가 지원되는 줄 모르고 pyGTK 나 tkinter 써야 하나.. 고민했는데, 그거 썼으면 하루는 꼬박 더 걸렸을 듯 싶다. MPL + NumPy + SciPy 에 대한 나의 애착은 점점 커져만 가는데;;;; 튜토리얼이라도 함 쓰고 싶지만.. 아. 나 원고해야되는데. -_-
스샷 몇개 더 + 소스코드
복잡한 모양을 만들어 봤다. 그래도 이정도면 매우 준수한 (...)데...
이정도는 매우 가뿐하게.. 분류한다.
이거는 sanity check 용으로 처음에 구현했던 Linear Discriminant Analysis.. 라지만 binary class 이니 그냥 선형회귀법과 같은 셈이다.
boosting + (weighted) linear regression.
소스코드. 별게 아니라 어디 가져가시진 않겠지만.. 혹시라도 그러실거면 말씀이라도..
lang:py
# a test implementation of boosted decision stumps: jongman@gmail.com
# run in ipython: %run -i play.py
from pylab import *
from itertools import izip
import sys
x, y, label = array([]), array([]), array([], dtype=int16)
class Functor:
def __init__(self, func, **kwargs):
self.func = func
self.kwargs = kwargs
def __call__(self, *args):
return self.func(*args, **self.kwargs)
def __repr__(self):
fn = self.func.func_name if type(self.func) == types.FunctionType else str(self.func)
return fn + "(" + ", ".join(["%s=%s" % (str(k),str(v)) for k, v in self.kwargs.iteritems()]) + ")"
# minimize sum of weights of misclassified samples
def trainDecisionStump(x, y, label, w = None):
if w == None: w = ones(len(x)) / len(x)
X = [x, y]
y = label
bestFeature, bestValue, bestMultiply, minWrong = 0, 0, 1, sum(w)
if abs(1.0 - sum(w)) > 1e-7:
print w
assert abs(1.0 - sum(w)) < 1e-7
for fidx in range(len(X)):
feature = X[fidx]
order = argsort(feature)
# if feature < threshold, predict -1
incorrect = sum(w[where(label == -1)])
for i in xrange(len(feature)):
if i > 0 and feature[order[i]] != feature[order[i-1]]:
mid = (feature[order[i]] + feature[order[i-1]]) * 0.5
# assume: feature < feature[order[i]], predict -1
wrong, mult = incorrect, 1
# if we are more than 50% wrong, reverse
if wrong > 0.5:
wrong = 1.0 - wrong
mult = -1
if wrong < minWrong:
minWrong = wrong
bestFeature = fidx
bestValue = mid
bestMultiply = mult
if label[order[i]] == -1:
incorrect -= w[order[i]]
else:
incorrect += w[order[i]]
return (bestFeature, bestValue, bestMultiply)
def predictDecisionStump(w, x, y):
features = [x,y]
return -1 if features[w[0]] * w[2] < w[1] * w[2] else 1
def trainAdaBoost(x, y, label, train, predict):
n = len(x)
if n == 0: return
ITERATIONS = 20
predictors = []
alphas = []
w = ones(n) / n
for t in xrange(ITERATIONS):
#print "iter %d ===, len %d" % (t, n)
predictors.append(train(x, y, label, w))
if predictors[-1] == None: return None
predicted = array([predict(predictors[-1], xx, yy) for xx, yy in izip(x, y)])
wrong = array([1 if p != l else 0 for p, l in izip(predicted, label)])
error = dot(wrong, w)
alphas.append(log((1-error)/error))
#print "iteration #%d: error %.4lf alpha %.4lf" % (t, error, alphas[-1])
if error <= 1e-8:
alphas = [1.0]
predictors = [predictors[-1]]
break
if t > 0 and error >= 0.5:
alphas.pop()
predictors.pop()
break
#print "incorrects are boosted by %.2lf" % exp(alphas[-1])
w = w * exp(alphas[-1] * wrong)
w = w / sum(w)
return (predict, predictors, alphas)
def predictAdaBoost(rep, x, y, quantize = True):
predict, predictors, alphas = rep
ret = 0.0
for predictor, alpha in izip(predictors, alphas):
ret += alpha * predict(predictor, x, y)
if quantize: return 1 if ret >= 0 else -1
return ret
def visualizeAdaBoost(rep):
global c
RES = 20
unit = 100. / RES
x = arange(0,101,unit)
y = arange(0,101,unit)
X, Y = meshgrid(x, y)
c = zeros((len(y)-1, len(x)-1))
for yi in xrange(RES):
yy = (yi + 0.5) * unit
for xi in xrange(RES):
xx = (xi + 0.5) * unit
c[yi][xi] = predictAdaBoost(rep, xx, yy, False)
#print "visualize (%d,%d) corresponds to (%.2lf, %.2lf) => %.2lf" % (yi,xi,yy,xx,c[yi][xi])
pcolor(X, Y, c, vmin=-1, vmax=1, alpha=0.8, cmap=cm.get_cmap("RdBu_r"))
if "cb" not in globals():
global cb
cb = colorbar()
cb.draw_all()
def trainLinear(x, y, label, w = None):
if w == None: w = ones(len(x))
X = vstack((x, y, ones(len(x)))).T
y = label
try:
ret = dot(inv(dot(X.T*w, X)), dot(X.T*w, y))
except:
# perhaps singular matrix
return None
return ret
def predictLinear(w, x, y):
return 1 if dot(array([x, y, 1]), w) >= 0 else -1
def visualizeLinear(rep, width=2):
a, b, c = rep
# horizontal line
if abs(a) < 1e-8:
plot([0, 100], [-c/b, -c/b], "k:", linewidth=width)
# vertical line
elif abs(b) < 1e-8:
plot([-c/a, -c/a], [0, 100], "k:", linewidth=width)
else:
x = arange(0,101,100)
plot(x, (-c - a*x) / b, "k:", linewidth=width)
# use linear regression
#train, predict, visualize = trainLinear, predictLinear, visualizeLinear
# use decision stump
#train, predict, visualize = trainDecisionStump, predictDecisionStump, None
# use adaboost with decision stump
#train, predict, visualize = Functor(trainAdaBoost, train=trainDecisionStump, predict=predictDecisionStump), predictAdaBoost, visualizeAdaBoost
# use adaboost with linear regression
train, predict, visualize = Functor(trainAdaBoost, train=trainLinear, predict=predictLinear), predictAdaBoost, lambda x: visualizeAdaBoost(x) or map(lambda p: visualizeLinear(p[0], p[1] * 5), zip(x[1], x[2]))
def onclick(event):
global x, y, label
x = append(x, event.xdata)
y = append(y, event.ydata)
label = append(label, 1 if event.button == 1 else -1)
show()
def show():
global pred
grid(True)
xlim((0, 100))
ylim((0, 100))
fig.axes[0].collections = []
fig.axes[0].lines = []
if len(label) > 0:
pred = train(x, y, label)
if pred != None:
predicted = array([predict(pred, xx, yy) for xx, yy in izip(x, y)])
if visualize:
visualize(pred)
else:
predicted = label
wrong = predicted != label
# remove what we plotted before
for l, color in zip([-1,1], ["blue", "red"]):
for c in [True,False]:
idx = intersect1d(where(wrong != c)[0], where(label == l)[0])
if len(idx) == 0: continue
if c:
scatter(x[idx], y[idx], s=80, marker="o", facecolor=color, edgecolor="w", linewidth=2)
else:
scatter(x[idx], y[idx], s=80, marker="x", edgecolor=color, linewidth=2)
sys.stdout.flush()
xlim((0, 100))
ylim((0, 100))
pylab.show()
if __name__ == "__main__":
global fig
close()
if "cb" in globals():
del cb
fig = figure()
cid = fig.canvas.mpl_connect('button_press_event', onclick)
show()
#set_cmap()


