JMK no matter what

ANN classification

옛날에 해봤던 boosted stumps 에 이어, 얼마 전에 PyBrain 발견한 김에 한번 해봐야겠다 하고 적용했다.

같은 입력에 대해 은닉층 노드 4개 놓은 ANN 과 boosted stumps 의 비교. 은닉층은 2개까지 줄여봤는데 별 차이는 없었다. 물론 1개로 줄이면 선형회귀가 되니 흠좀무.

이런 구조를 갖는 문제에서는 역시 ANN 이 훨~씬~ 훨~씬~ 나은듯. 근데 내가 지금 풀고자 하는 문제는 사실 이런게 아닌데 이런걸로 직관 얻으려고 해봐야... ㅡ,.ㅡ 그래도 재미는 있구나. 흠

아, 실제로는 PyBrain 튜토리얼 따라서 XOR classification 해보려고 했는데 그것도 안되는 바람에 (!) ffnet 이라는 라이브러리로 전환. PyBrain 보다 훨씬 나은듯. 학습하고 나면 포트란 pre-written 소스코드로 익스포트도 해준다. 헐퀴

소스코드

lang:py
# a test implementation of boosted decision stumps: jongman@gmail.com
# run in ipython: %run -i play.py

from pylab import *
from itertools import izip
import sys

x, y, label = array([]), array([]), array([], dtype=int16)

class Functor:
    def __init__(self, func, *args, **kwargs):
        self.func = func
        self.args = args
        self.kwargs = kwargs
    def __call__(self, *args):
        return self.func(*(self.args + args), **self.kwargs)
    def __repr__(self):
        fn = self.func.func_name if type(self.func) == types.FunctionType else str(self.func)
        return (fn + "("
                + ", ".join("%s" % str(v) for v in self.args)
                + ", ".join(["%s=%s" % (str(k),str(v))
                             for k, v in self.kwargs.iteritems()]) + ")")

# minimize sum of weights of misclassified samples
def trainDecisionStump(x, y, label, w = None):
    if w == None: w = ones(len(x)) / len(x)
    X = [x, y]
    y = label
    bestFeature, bestValue, bestMultiply, minWrong = 0, 0, 1, sum(w)
    if abs(1.0 - sum(w)) > 1e-7:
        print w
        assert abs(1.0 - sum(w)) < 1e-7
    for fidx in range(len(X)):
        feature = X[fidx]
        order = argsort(feature)
        # if feature < threshold, predict -1
        incorrect = sum(w[where(label == -1)])
        for i in xrange(len(feature)):
            if i > 0 and feature[order[i]] != feature[order[i-1]]:
                mid = (feature[order[i]] + feature[order[i-1]]) * 0.5
                # assume: feature < feature[order[i]], predict -1
                wrong, mult = incorrect, 1
                # if we are more than 50% wrong, reverse
                if wrong > 0.5:
                    wrong = 1.0 - wrong
                    mult = -1
                if wrong < minWrong:
                    minWrong = wrong
                    bestFeature = fidx
                    bestValue = mid
                    bestMultiply = mult
            if label[order[i]] == -1:
                incorrect -= w[order[i]]
            else:
                incorrect += w[order[i]]
    return (bestFeature, bestValue, bestMultiply)

def predictDecisionStump(w, x, y):
    features = [x,y]
    return -1 if features[w[0]] * w[2] < w[1] * w[2] else 1

def trainAdaBoost(x, y, label, train, predict):
    n = len(x)
    if n == 0: return
    ITERATIONS = 20
    predictors = []
    alphas = []
    w = ones(n) / n
    for t in xrange(ITERATIONS):
        #print "iter %d ===, len %d" % (t, n)
        predictors.append(train(x, y, label, w))
        if predictors[-1] == None: return None
        predicted = array([predict(predictors[-1], xx, yy) for xx, yy in izip(x, y)])
        wrong = array([1 if p != l else 0 for p, l in izip(predicted, label)])
        error = dot(wrong, w)
        alphas.append(log((1-error)/error))
        #print "iteration #%d: error %.4lf alpha %.4lf" % (t, error, alphas[-1])
        if error <= 1e-8:
            alphas = [1.0]
            predictors = [predictors[-1]]
            break
        if t > 0 and error >= 0.5:
            alphas.pop()
            predictors.pop()
            break
        #print "incorrects are boosted by %.2lf" % exp(alphas[-1])
        w = w * exp(alphas[-1] * wrong)
        w = w / sum(w)

    return (predict, predictors, alphas)

def predictAdaBoost(rep, x, y, quantize = True):
    predict, predictors, alphas = rep
    ret = 0.0
    for predictor, alpha in izip(predictors, alphas):
        ret += alpha * predict(predictor, x, y)
    if quantize: return 1 if ret >= 0 else -1
    return ret

def visualizeGrid(func, rep):
    global c
    RES = 20
    unit = 100. / RES
    x = arange(0,101,unit)
    y = arange(0,101,unit)
    X, Y = meshgrid(x, y)
    c = zeros((len(y)-1, len(x)-1))
    for yi in xrange(RES):
        yy = (yi + 0.5) * unit
        for xi in xrange(RES):
            xx = (xi + 0.5) * unit
            c[yi][xi] = func(rep, xx, yy, False)
            #predictAdaBoost(rep, xx, yy, False)
            #print "visualize (%d,%d) corresponds to (%.2lf, %.2lf) => %.2lf" % (yi,xi,yy,xx,c[yi][xi])
    pcolor(X, Y, c, vmin=-1, vmax=1, alpha=0.8, cmap=cm.get_cmap("RdBu_r"))
    if "cb" not in globals():
        global cb
        cb = colorbar()
    cb.draw_all()

visualizeAdaBoost = Functor(visualizeGrid, predictAdaBoost)

def trainNN(x, y, label):
    from ffnet import ffnet, mlgraph
    input = zip(x, y)
    target = [[l] for l in label]

    conec = mlgraph((2, 4, 1))
    net = ffnet(conec)

    net.train_genetic(input, target, individuals=20, generations=500)
    net.train_tnc(input, target, maxfun=1000, messages=1)
    return net

def predictNN(rep, x, y, quantize = True):
    ret = rep((x, y))
    if quantize: return -1 if ret <= 0 else 1
    return ret

visualizeNN = Functor(visualizeGrid, predictNN)

def trainLinear(x, y, label, w = None):
    if w == None: w = ones(len(x))
    X = vstack((x, y, ones(len(x)))).T
    y = label
    try:
        ret = dot(inv(dot(X.T*w, X)), dot(X.T*w, y))
    except:
        # perhaps singular matrix
        return None
    return ret

def predictLinear(w, x, y):
    return 1 if dot(array([x, y, 1]), w) >= 0 else -1

def visualizeLinear(rep, width=2):
    a, b, c = rep
    # horizontal line
    if abs(a) < 1e-8:
        plot([0, 100], [-c/b, -c/b], "k:", linewidth=width)
    # vertical line
    elif abs(b) < 1e-8:
        plot([-c/a, -c/a], [0, 100], "k:", linewidth=width)
    else:
        x = arange(0,101,100)
        plot(x, (-c - a*x) / b, "k:", linewidth=width)

# use linear regression
#train, predict, visualize = trainLinear, predictLinear, visualizeLinear

# use decision stump
#train, predict, visualize = trainDecisionStump, predictDecisionStump, None

# use adaboost with decision stump
#train, predict, visualize = Functor(trainAdaBoost, train=trainDecisionStump, predict=predictDecisionStump), predictAdaBoost, visualizeAdaBoost

# use adaboost with linear regression
#train, predict, visualize = Functor(trainAdaBoost, train=trainLinear, predict=predictLinear), predictAdaBoost, lambda x: visualizeAdaBoost(x) or map(lambda p: visualizeLinear(p[0], p[1] * 5), zip(x[1], x[2]))

# use neural network
train, predict, visualize = trainNN, predictNN, visualizeNN

def onclick(event):
    global x, y, label
    x = append(x, event.xdata)
    y = append(y, event.ydata)
    label = append(label, 1 if event.button == 1 else -1)
    show()

def save():
    return (x, y, label)

def load(rep):
    global x, y, label
    x, y, label = rep
    show()

def show():
    global pred
    grid(True)
    xlim((0, 100))
    ylim((0, 100))

    fig.axes[0].collections = []
    fig.axes[0].lines = []
    if len(label) > 0:
        pred = train(x, y, label)
        if pred != None:
            predicted = array([predict(pred, xx, yy) for xx, yy in izip(x, y)])
            if visualize:
                visualize(pred)
        else:
            predicted = label
        print "label", label
        print "predicted", predicted
        wrong = predicted != label

        # remove what we plotted before
        for l, color in zip([-1,1], ["blue", "red"]):
            for c in [True,False]:
                idx = intersect1d(where(wrong != c)[0], where(label == l)[0])
                if len(idx) == 0: continue
                if c:
                    scatter(x[idx], y[idx], s=80, marker="o", facecolor=color, edgecolor="w", linewidth=2)
                else:
                    scatter(x[idx], y[idx], s=80, marker="x", edgecolor=color, linewidth=2)
    sys.stdout.flush()

    xlim((0, 100))
    ylim((0, 100))
    pylab.show()

if __name__ == "__main__":
    global fig
    close()
    if "cb" in globals():
        del cb
    fig = figure()
    cid = fig.canvas.mpl_connect('button_press_event', onclick)
    show()

점점~ 길어지고~ 있구나..

2010-04-18 15:18:53 | JM | /writings/cs/ | 0 Comments

Leave a comment