JMK no matter what

Boosted Stumps 를 이용한 2차원 평면 상에서의 classification

(update) weighted linear regression + boosting 추가

2차원 평면에 두 종류의 점들을 늘어놓고 (여기서는 빨간점과 파란점) 이들의 좌표를 자질로 해서 점의 종류를 판별하는 문제는 여러 기계 분류 알고리즘을 설명하기 위해 자주 쓰인다. 오늘 Decision Stump 의 곱을 부스팅 하는 것에 관한 논문을 읽다가 심심한데 한번 해볼까 하고 저녁 내내 (ㅠ.ㅠ 어느덧 열한시...) 구현하였다. 일단은 가장 일반적인 additive AdaBoost 버전으로. :-)

그림은 XOR 에 해당하는 입력을 주고, 학습시킨 뒤 다시 분류 시킨 것. 동그라미들과 엑스표들은 입력 (동그라미는 제대로 분류된 것, X 는 잘못 분류된 것) 배경 색은 해당 위치에 대한 predictor 의 답을 보여준다. 물론 오버피팅 쩔긴 하지만 이 정도면 놀랍도록 잘 되는 축인 듯. :) Iteration 수는 20회.

처음엔 저번에 산 Machine Learning: An Algorithmic Perspective 를 보고 구현했는데, 설명은 참 쉽게 잘 되어 있지만 algorithmic perspective 라서 그런지 증명과 수학, rationale 얘기는 정말 하나도 없구나... -_- 그냥 아무 생각 없이 뭔지 알고 써먹기만 할 때 유용할 것 같다. 회사에서는 뽀스 아저씨의 Elements of Statistical Learning 을 빌려 보고 있는데 이거 괜찮다. 좀 마이 어렵지만... 두 권을 병행해서 보니 좀 이해가 잘 된다. (해수 참고하셈;;;)

지금까지 한번도 안 써본 matplotlib 의 인터랙티브 기능을 이용해 구현했는데 정말 편했다. 클릭할 때마다 점을 추가하고 자동 학습 + 보여주는 것을 정말 간단하게 구현할 수 있었다. 이벤트 드리븐으로 짤 수 있는데다, ipython 세션이 항상 떠 있으니 디버거가 항상 같이 떠 있는 셈. 처음에는 컬러맵 (MPL 에서 pcolor 함수) 가 지원되는 줄 모르고 pyGTK 나 tkinter 써야 하나.. 고민했는데, 그거 썼으면 하루는 꼬박 더 걸렸을 듯 싶다. MPL + NumPy + SciPy 에 대한 나의 애착은 점점 커져만 가는데;;;; 튜토리얼이라도 함 쓰고 싶지만.. 아. 나 원고해야되는데. -_-

스샷 몇개 더 + 소스코드

복잡한 모양을 만들어 봤다. 그래도 이정도면 매우 준수한 (...)데...

이정도는 매우 가뿐하게.. 분류한다.

이거는 sanity check 용으로 처음에 구현했던 Linear Discriminant Analysis.. 라지만 binary class 이니 그냥 선형회귀법과 같은 셈이다.

boosting + (weighted) linear regression.

소스코드. 별게 아니라 어디 가져가시진 않겠지만.. 혹시라도 그러실거면 말씀이라도..

lang:py
# a test implementation of boosted decision stumps: jongman@gmail.com
# run in ipython: %run -i play.py

from pylab import *
from itertools import izip
import sys

x, y, label = array([]), array([]), array([], dtype=int16)

class Functor:
    def __init__(self, func, **kwargs):
        self.func = func
        self.kwargs = kwargs
    def __call__(self, *args):
        return self.func(*args, **self.kwargs)
    def __repr__(self):
        fn = self.func.func_name if type(self.func) == types.FunctionType else str(self.func)
        return fn + "(" + ", ".join(["%s=%s" % (str(k),str(v)) for k, v in self.kwargs.iteritems()]) + ")"

# minimize sum of weights of misclassified samples
def trainDecisionStump(x, y, label, w = None):
    if w == None: w = ones(len(x)) / len(x)
    X = [x, y]
    y = label
    bestFeature, bestValue, bestMultiply, minWrong = 0, 0, 1, sum(w)
    if abs(1.0 - sum(w)) > 1e-7:
        print w
        assert abs(1.0 - sum(w)) < 1e-7
    for fidx in range(len(X)):
        feature = X[fidx]
        order = argsort(feature)
        # if feature < threshold, predict -1
        incorrect = sum(w[where(label == -1)])
        for i in xrange(len(feature)):
            if i > 0 and feature[order[i]] != feature[order[i-1]]:
                mid = (feature[order[i]] + feature[order[i-1]]) * 0.5
                # assume: feature < feature[order[i]], predict -1
                wrong, mult = incorrect, 1
                # if we are more than 50% wrong, reverse
                if wrong > 0.5:
                    wrong = 1.0 - wrong
                    mult = -1
                if wrong < minWrong:
                    minWrong = wrong
                    bestFeature = fidx
                    bestValue = mid
                    bestMultiply = mult
            if label[order[i]] == -1:
                incorrect -= w[order[i]]
            else:
                incorrect += w[order[i]]
    return (bestFeature, bestValue, bestMultiply)

def predictDecisionStump(w, x, y):
    features = [x,y]
    return -1 if features[w[0]] * w[2] < w[1] * w[2] else 1

def trainAdaBoost(x, y, label, train, predict):
    n = len(x)
    if n == 0: return
    ITERATIONS = 20
    predictors = []
    alphas = []
    w = ones(n) / n
    for t in xrange(ITERATIONS):
        #print "iter %d ===, len %d" % (t, n)
        predictors.append(train(x, y, label, w))
        if predictors[-1] == None: return None
        predicted = array([predict(predictors[-1], xx, yy) for xx, yy in izip(x, y)])
        wrong = array([1 if p != l else 0 for p, l in izip(predicted, label)])
        error = dot(wrong, w)
        alphas.append(log((1-error)/error))
        #print "iteration #%d: error %.4lf alpha %.4lf" % (t, error, alphas[-1])
        if error <= 1e-8:
            alphas = [1.0]
            predictors = [predictors[-1]]
            break
        if t > 0 and error >= 0.5:
            alphas.pop()
            predictors.pop()
            break
        #print "incorrects are boosted by %.2lf" % exp(alphas[-1])
        w = w * exp(alphas[-1] * wrong)
        w = w / sum(w)

    return (predict, predictors, alphas)

def predictAdaBoost(rep, x, y, quantize = True):
    predict, predictors, alphas = rep
    ret = 0.0
    for predictor, alpha in izip(predictors, alphas):
        ret += alpha * predict(predictor, x, y)
    if quantize: return 1 if ret >= 0 else -1
    return ret

def visualizeAdaBoost(rep):
    global c
    RES = 20
    unit = 100. / RES
    x = arange(0,101,unit)
    y = arange(0,101,unit)
    X, Y = meshgrid(x, y)
    c = zeros((len(y)-1, len(x)-1))
    for yi in xrange(RES):
        yy = (yi + 0.5) * unit
        for xi in xrange(RES):
            xx = (xi + 0.5) * unit
            c[yi][xi] = predictAdaBoost(rep, xx, yy, False)
            #print "visualize (%d,%d) corresponds to (%.2lf, %.2lf) => %.2lf" % (yi,xi,yy,xx,c[yi][xi])
    pcolor(X, Y, c, vmin=-1, vmax=1, alpha=0.8, cmap=cm.get_cmap("RdBu_r"))
    if "cb" not in globals():
        global cb
        cb = colorbar()
    cb.draw_all()

def trainLinear(x, y, label, w = None):
    if w == None: w = ones(len(x))
    X = vstack((x, y, ones(len(x)))).T
    y = label
    try:
        ret = dot(inv(dot(X.T*w, X)), dot(X.T*w, y))
    except:
        # perhaps singular matrix
        return None
    return ret

def predictLinear(w, x, y):
    return 1 if dot(array([x, y, 1]), w) >= 0 else -1

def visualizeLinear(rep, width=2):
    a, b, c = rep
    # horizontal line
    if abs(a) < 1e-8:
        plot([0, 100], [-c/b, -c/b], "k:", linewidth=width)
    # vertical line
    elif abs(b) < 1e-8:
        plot([-c/a, -c/a], [0, 100], "k:", linewidth=width)
    else:
        x = arange(0,101,100)
        plot(x, (-c - a*x) / b, "k:", linewidth=width)

# use linear regression
#train, predict, visualize = trainLinear, predictLinear, visualizeLinear
# use decision stump
#train, predict, visualize = trainDecisionStump, predictDecisionStump, None
# use adaboost with decision stump
#train, predict, visualize = Functor(trainAdaBoost, train=trainDecisionStump, predict=predictDecisionStump), predictAdaBoost, visualizeAdaBoost
# use adaboost with linear regression
train, predict, visualize = Functor(trainAdaBoost, train=trainLinear, predict=predictLinear), predictAdaBoost, lambda x: visualizeAdaBoost(x) or map(lambda p: visualizeLinear(p[0], p[1] * 5), zip(x[1], x[2]))

def onclick(event):
    global x, y, label
    x = append(x, event.xdata)
    y = append(y, event.ydata)
    label = append(label, 1 if event.button == 1 else -1)
    show()

def show():
    global pred
    grid(True)
    xlim((0, 100))
    ylim((0, 100))

    fig.axes[0].collections = []
    fig.axes[0].lines = []
    if len(label) > 0:
        pred = train(x, y, label)
        if pred != None:
            predicted = array([predict(pred, xx, yy) for xx, yy in izip(x, y)])
            if visualize:
                visualize(pred)
        else:
            predicted = label
        wrong = predicted != label

        # remove what we plotted before
        for l, color in zip([-1,1], ["blue", "red"]):
            for c in [True,False]:
                idx = intersect1d(where(wrong != c)[0], where(label == l)[0])
                if len(idx) == 0: continue
                if c:
                    scatter(x[idx], y[idx], s=80, marker="o", facecolor=color, edgecolor="w", linewidth=2)
                else:
                    scatter(x[idx], y[idx], s=80, marker="x", edgecolor=color, linewidth=2)
    sys.stdout.flush()

    xlim((0, 100))
    ylim((0, 100))
    pylab.show()

if __name__ == "__main__":
    global fig
    close()
    if "cb" in globals():
        del cb
    fig = figure()
    cid = fig.canvas.mpl_connect('button_press_event', onclick)
    show()

    #set_cmap()


2010-01-13 14:19:12 | JM | /writings/cs/ | 9 Comments
dasony
2010-01-14 02:46:58
수학과 증명은 바라지도 않지만 rationale 조차 없다고? 허허. 사야지 [...]
JM
2010-01-14 03:22:00
@dasony/ ㅇㅇ 쉬운거 어려운거 하나씩 놓고 보는게 젤 좋은듯. ESL 은 pdf 공개되어 있는데 살까 말까 고민중..
큐브
2010-01-15 04:58:49
오오~ 일전에 비슷한(?) 문제로 고민했었는데.. 제 일 중에 여러개의 인공위성 데이터를 비교분석하는 게 있거든요. 위성1이 지구를 내려다볼 때 본 픽셀의 네 꼭지점이 주어졌을 때, 위성2가 주는 픽셀의 중심 좌표들 중에 "the 위성1 픽셀"에 포함되는 녀석들을 찾아내는 문제였어요. 저는 엄청 무식하게 (고1 수준 수학으로) 했었는데... 잘 보면 많이 참고 될 것 같은데 제가 영 까막눈이라서 올리신 코드를 잘 못읽겠네요.. 부끄러워라 ㅠㅠ
큐브
2010-01-15 05:04:07
근데 좀 보니까 문제만 아주^아주 약~간만 유사한 정도네요 ㅎㅎ 반가워서 덧글을 좀 길게 달았는데 ㅠㅠ
JM
2010-01-20 00:10:49
@큐브, ㅎㅎㅎ 네. 말씀하신 문제는 이거랑은 좀 다를 것 같아요. ^^; 이런 문제들은 주어진 입력 데이터들에 내재된 구조를 캡쳐하려고 하는 건데, 가장 가까운 것을 찾는 문제에서는 그 구조가 매우 단순하니까요~ :)
JM
2010-01-20 00:11:09
@큐브 근데 수학과 전공이시잖아요? 인공위성 데이터요? -_-;; applied math 하시나봐요 'ㅅ' 간지 캬
큐브
2010-01-21 03:19:20
제 전공이 어째서 수학으로 알려진건지 모르겠지만... 제 전공은 대기화학인데요 -,.-
JM
2010-01-21 04:48:24
@큐브 헐... -_-;;; 어쩐지 블로그에 올라오는 플롯이 신기하더라니.. 제가 왜 그렇게 생각했을까요... ㅠ.ㅠ 죄송
JM
2010-01-21 04:49:19
근데 대기화학도 이름 참 멋있네영. 간지

Leave a comment