invicnaper/MWF

View on GitHub
Utils/t/proto_test.py

Summary

Maintainability
D
2 days
Test Coverage
#!/usr/bin/env python
# coding=utf-8
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
import sys
import socket
import time
import os.path
import traceback
import random
import toscgi
import tofcgi

def check(name,A,B):
    if A != B:
        print "Error :" + name 
        print "-----Actual--"
        print A,"---Expected--"
        print B,"-------------"
        f1=open('actual.txt','wb')
        f2=open('expected.txt','wb')
        f1.write(A)
        f2.write(B)
        f1.close()
        f2.close()
        sys.exit(1)
    else:
        print "Ok:"+name

class Nothing:
    pass

def identity(x):
    return x

def load_file(file_name):
    file_name = os.path.dirname(sys.argv[0]) + "/" + file_name
    f=open(file_name,'rb')
    input=f.read()
    f.close()
    return input

def get_socket():
    global socket_type
    s=socket.socket(socket_type,socket.SOCK_STREAM);
    global target
    s.connect(target)
    if socket_type==socket.AF_INET:
        s.setsockopt(socket.IPPROTO_TCP,socket.TCP_NODELAY,1)
    return s

def test_io(name,method,input_f,output_f,seed=12,load=load_file,parse=identity):
    result=''
    try:
        input=load(input_f)
        output=load_file(output_f)

        s=get_socket()

        if method=='normal':
            s.sendall(input)
            while 1:
                chunk = s.recv(1024)
                if chunk == '':
                    break
                result=result+chunk
        elif method=='shortest':
            for char in input:
                s.sendall(char)
                time.sleep(0.001)
            while 1:
                chunk = s.recv(1)
                if chunk == '':
                    break;
                time.sleep(0.001)
                result=result+chunk
        elif method=='random':
            random.seed(seed);
            size = 0
            while size < len(input):
                chunk=input[size:size+random.randint(1,16)]
                size = size + len(chunk)
                s.sendall(chunk)
                time.sleep(0.001)
            while 1:
                chunk = s.recv(random.randint(1,16))
                if chunk == '':
                    break;
                time.sleep(0.001)
                result=result+chunk
        else:
            print 'Unknown method',method
            sys.exit(1)
        if hasattr(socket,'SHUT_RDWR'):
            s.shutdown(socket.SHUT_RDWR)
        else:
            s.shutdown(2) 
        s.close()
    except socket.error:
        pass
    check(name,parse(result),output)


def test_all(name,load=load_file,parse=identity):
    input_f = 'proto_test_' + name + '.in'
    output_f = 'proto_test_' + name + '.out'
    test_io(name+' normal','normal',input_f,output_f,0,load,parse)
    test_io(name+' random 1','random',input_f,output_f,1,load,parse)
    test_io(name+' random 2','random',input_f,output_f,15,load,parse)
    test_io(name+' random 3','random',input_f,output_f,215,load,parse)
    test_io(name+' shortest','shortest',input_f,output_f,0,load,parse)

def test_normal(name,load=load_file,parse=identity):
    input_f = 'proto_test_' + name + '.in'
    output_f = 'proto_test_' + name + '.out'
    test_io(name+' normal','normal',input_f,output_f,0,load,parse)

def test_scgi(name):
    def load(name):
        file=load_file(name)
        return toscgi.toscgi(file)
    test_all(name,load)

def transfer_all(s,inp):
    s.setblocking(1)
    s.sendall(inp)
    s.setblocking(0)
    result = ''
    slept=0
    while 1:
        try:
            tmp = s.recv(1024)
            if tmp == '':
                raise Exception('Unexpected end of file')
            result+=tmp
            slept=0
        except socket.error:
            if slept:
                return result
            else:
                time.sleep(0.1)
                slept=1

def test_fcgi_keep_alive(name):
    input_f = 'proto_test_' + name + '.in'
    output_f = 'proto_test_' + name + '.out'
    inp = tofcgi.to_fcgi_request(load_file(input_f),tofcgi.TEST_KEEP_ALIVE)
    out = load_file(output_f)
    s=get_socket()
    res = tofcgi.from_fcgi_response(transfer_all(s,inp),tofcgi.TEST_KEEP_ALIVE)
    check('fcgi keep alive first ' + name,res,out)
    res = tofcgi.from_fcgi_response(transfer_all(s,inp),tofcgi.TEST_KEEP_ALIVE)
    check('fcgi keep alive second ' + name,res,out)
    if hasattr(socket,'SHUT_RDWR'):
        s.shutdown(socket.SHUT_RDWR)
    else:
        s.shutdown(2) 
    s.close()




def test_fcgi(name,flags = 0):
    def load(name):
        file=load_file(name)
        return tofcgi.to_fcgi_request(file,flags)
    def parse(str):
        return tofcgi.from_fcgi_response(str,flags)
    if flags == 0:
        test_all(name,load,parse)
    else:
        test_normal(name,load,parse)




global target
global socket_type

def usege():
    print 'Usage proto_test.py (http|fastcgi_tcp|fastcgi_unix|scgi_tcp|scgi_unix)'
    sys.exit(1)

if len(sys.argv) != 2:
    usege()

test=sys.argv[1]



if test=='http' or test=='fastcgi_tcp' or test=='scgi_tcp':
    target=('localhost',8080)
    socket_type=socket.AF_INET
else:
    target=('/tmp/cppcms_test_socket')
    socket_type=socket.AF_UNIX

if test=='http':
    test_all('http_1')
    test_all('http_2')
    test_all('http_3')
    test_all('http_4')
    test_all('http_5')
elif test=='fastcgi_tcp' or test=='fastcgi_unix':
    test_fcgi_keep_alive('scgi_1')
    test_fcgi_keep_alive('scgi_2')
    test_fcgi_keep_alive('scgi_3')
    test_fcgi('scgi_1')
    test_fcgi('scgi_2')
    test_fcgi('scgi_3')
    print "Testing big pairs"
    test_fcgi_keep_alive('scgi_4')
    test_fcgi_keep_alive('scgi_5')
    test_fcgi('scgi_4')
    test_fcgi('scgi_5')
    print "Testing chunked pairs"
    test_fcgi('scgi_4',tofcgi.TEST_RANDOM)
    test_fcgi('scgi_5',tofcgi.TEST_RANDOM)
    print "Testing GET_VALUES"
    test_fcgi('scgi_1',tofcgi.TEST_GET_VALUES)
elif test=='scgi_tcp' or test=='scgi_unix':
    test_scgi('scgi_1')
    test_scgi('scgi_2')
    test_scgi('scgi_3')
else:
    usege()