# -*- coding: utf-8 -*-
"""
    werkzeug.testsuite.wrappers
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~

    Tests for the response and request objects.

    :copyright: (c) 2013 by Armin Ronacher.
    :license: BSD, see LICENSE for more details.
"""
import unittest
import pickle
from io import BytesIO
from datetime import datetime
from werkzeug._compat import iteritems

from werkzeug.testsuite import WerkzeugTestCase

from werkzeug import wrappers
from werkzeug.exceptions import SecurityError
from werkzeug.wsgi import LimitedStream
from werkzeug.datastructures import MultiDict, ImmutableOrderedMultiDict, \
     ImmutableList, ImmutableTypeConversionDict, CharsetAccept, \
     MIMEAccept, LanguageAccept, Accept, CombinedMultiDict
from werkzeug.test import Client, create_environ, run_wsgi_app
from werkzeug._compat import implements_iterator, text_type


class RequestTestResponse(wrappers.BaseResponse):
    """Subclass of the normal response class we use to test response
    and base classes.  Has some methods to test if things in the
    response match.
    """

    def __init__(self, response, status, headers):
        wrappers.BaseResponse.__init__(self, response, status, headers)
        self.body_data = pickle.loads(self.get_data())

    def __getitem__(self, key):
        return self.body_data[key]


def request_demo_app(environ, start_response):
    request = wrappers.BaseRequest(environ)
    assert 'werkzeug.request' in environ
    start_response('200 OK', [('Content-Type', 'text/plain')])
    return [pickle.dumps({
        'args':             request.args,
        'args_as_list':     list(request.args.lists()),
        'form':             request.form,
        'form_as_list':     list(request.form.lists()),
        'environ':          prepare_environ_pickle(request.environ),
        'data':             request.get_data()
    })]


def prepare_environ_pickle(environ):
    result = {}
    for key, value in iteritems(environ):
        try:
            pickle.dumps((key, value))
        except Exception:
            continue
        result[key] = value
    return result


class WrappersTestCase(WerkzeugTestCase):

    def assert_environ(self, environ, method):
        self.assert_strict_equal(environ['REQUEST_METHOD'], method)
        self.assert_strict_equal(environ['PATH_INFO'], '/')
        self.assert_strict_equal(environ['SCRIPT_NAME'], '')
        self.assert_strict_equal(environ['SERVER_NAME'], 'localhost')
        self.assert_strict_equal(environ['wsgi.version'], (1, 0))
        self.assert_strict_equal(environ['wsgi.url_scheme'], 'http')

    def test_base_request(self):
        client = Client(request_demo_app, RequestTestResponse)

        # get requests
        response = client.get('/?foo=bar&foo=hehe')
        self.assert_strict_equal(response['args'], MultiDict([('foo', u'bar'), ('foo', u'hehe')]))
        self.assert_strict_equal(response['args_as_list'], [('foo', [u'bar', u'hehe'])])
        self.assert_strict_equal(response['form'], MultiDict())
        self.assert_strict_equal(response['form_as_list'], [])
        self.assert_strict_equal(response['data'], b'')
        self.assert_environ(response['environ'], 'GET')

        # post requests with form data
        response = client.post('/?blub=blah', data='foo=blub+hehe&blah=42',
                               content_type='application/x-www-form-urlencoded')
        self.assert_strict_equal(response['args'], MultiDict([('blub', u'blah')]))
        self.assert_strict_equal(response['args_as_list'], [('blub', [u'blah'])])
        self.assert_strict_equal(response['form'], MultiDict([('foo', u'blub hehe'), ('blah', u'42')]))
        self.assert_strict_equal(response['data'], b'')
        # currently we do not guarantee that the values are ordered correctly
        # for post data.
        ## self.assert_strict_equal(response['form_as_list'], [('foo', ['blub hehe']), ('blah', ['42'])])
        self.assert_environ(response['environ'], 'POST')

        # patch requests with form data
        response = client.patch('/?blub=blah', data='foo=blub+hehe&blah=42',
                                content_type='application/x-www-form-urlencoded')
        self.assert_strict_equal(response['args'], MultiDict([('blub', u'blah')]))
        self.assert_strict_equal(response['args_as_list'], [('blub', [u'blah'])])
        self.assert_strict_equal(response['form'],
                                 MultiDict([('foo', u'blub hehe'), ('blah', u'42')]))
        self.assert_strict_equal(response['data'], b'')
        self.assert_environ(response['environ'], 'PATCH')

        # post requests with json data
        json = b'{"foo": "bar", "blub": "blah"}'
        response = client.post('/?a=b', data=json, content_type='application/json')
        self.assert_strict_equal(response['data'], json)
        self.assert_strict_equal(response['args'], MultiDict([('a', u'b')]))
        self.assert_strict_equal(response['form'], MultiDict())

    def test_query_string_is_bytes(self):
        req = wrappers.Request.from_values(u'/?foo=%2f')
        self.assert_strict_equal(req.query_string, b'foo=%2f')

    def test_access_route(self):
        req = wrappers.Request.from_values(headers={
            'X-Forwarded-For': '192.168.1.2, 192.168.1.1'
        })
        req.environ['REMOTE_ADDR'] = '192.168.1.3'
        self.assert_equal(req.access_route, ['192.168.1.2', '192.168.1.1'])
        self.assert_strict_equal(req.remote_addr, '192.168.1.3')

        req = wrappers.Request.from_values()
        req.environ['REMOTE_ADDR'] = '192.168.1.3'
        self.assert_strict_equal(list(req.access_route), ['192.168.1.3'])

    def test_url_request_descriptors(self):
        req = wrappers.Request.from_values('/bar?foo=baz', 'http://example.com/test')
        self.assert_strict_equal(req.path, u'/bar')
        self.assert_strict_equal(req.full_path, u'/bar?foo=baz')
        self.assert_strict_equal(req.script_root, u'/test')
        self.assert_strict_equal(req.url, u'http://example.com/test/bar?foo=baz')
        self.assert_strict_equal(req.base_url, u'http://example.com/test/bar')
        self.assert_strict_equal(req.url_root, u'http://example.com/test/')
        self.assert_strict_equal(req.host_url, u'http://example.com/')
        self.assert_strict_equal(req.host, 'example.com')
        self.assert_strict_equal(req.scheme, 'http')

        req = wrappers.Request.from_values('/bar?foo=baz', 'https://example.com/test')
        self.assert_strict_equal(req.scheme, 'https')

    def test_url_request_descriptors_hosts(self):
        req = wrappers.Request.from_values('/bar?foo=baz', 'http://example.com/test')
        req.trusted_hosts = ['example.com']
        self.assert_strict_equal(req.path, u'/bar')
        self.assert_strict_equal(req.full_path, u'/bar?foo=baz')
        self.assert_strict_equal(req.script_root, u'/test')
        self.assert_strict_equal(req.url, u'http://example.com/test/bar?foo=baz')
        self.assert_strict_equal(req.base_url, u'http://example.com/test/bar')
        self.assert_strict_equal(req.url_root, u'http://example.com/test/')
        self.assert_strict_equal(req.host_url, u'http://example.com/')
        self.assert_strict_equal(req.host, 'example.com')
        self.assert_strict_equal(req.scheme, 'http')

        req = wrappers.Request.from_values('/bar?foo=baz', 'https://example.com/test')
        self.assert_strict_equal(req.scheme, 'https')

        req = wrappers.Request.from_values('/bar?foo=baz', 'http://example.com/test')
        req.trusted_hosts = ['example.org']
        self.assert_raises(SecurityError, lambda: req.url)
        self.assert_raises(SecurityError, lambda: req.base_url)
        self.assert_raises(SecurityError, lambda: req.url_root)
        self.assert_raises(SecurityError, lambda: req.host_url)
        self.assert_raises(SecurityError, lambda: req.host)

    def test_authorization_mixin(self):
        request = wrappers.Request.from_values(headers={
            'Authorization': 'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='
        })
        a = request.authorization
        self.assert_strict_equal(a.type, 'basic')
        self.assert_strict_equal(a.username, 'Aladdin')
        self.assert_strict_equal(a.password, 'open sesame')

    def test_stream_only_mixing(self):
        request = wrappers.PlainRequest.from_values(
            data=b'foo=blub+hehe',
            content_type='application/x-www-form-urlencoded'
        )
        self.assert_equal(list(request.files.items()), [])
        self.assert_equal(list(request.form.items()), [])
        self.assert_raises(AttributeError, lambda: request.data)
        self.assert_strict_equal(request.stream.read(), b'foo=blub+hehe')

    def test_base_response(self):
        # unicode
        response = wrappers.BaseResponse(u'öäü')
        self.assert_strict_equal(response.get_data(), u'öäü'.encode('utf-8'))

        # writing
        response = wrappers.Response('foo')
        response.stream.write('bar')
        self.assert_strict_equal(response.get_data(), b'foobar')

        # set cookie
        response = wrappers.BaseResponse()
        response.set_cookie('foo', 'bar', 60, 0, '/blub', 'example.org')
        self.assert_strict_equal(response.headers.to_wsgi_list(), [
            ('Content-Type', 'text/plain; charset=utf-8'),
            ('Set-Cookie', 'foo=bar; Domain=example.org; Expires=Thu, '
             '01-Jan-1970 00:00:00 GMT; Max-Age=60; Path=/blub')
        ])

        # delete cookie
        response = wrappers.BaseResponse()
        response.delete_cookie('foo')
        self.assert_strict_equal(response.headers.to_wsgi_list(), [
            ('Content-Type', 'text/plain; charset=utf-8'),
            ('Set-Cookie', 'foo=; Expires=Thu, 01-Jan-1970 00:00:00 GMT; Max-Age=0; Path=/')
        ])

        # close call forwarding
        closed = []
        @implements_iterator
        class Iterable(object):
            def __next__(self):
                raise StopIteration()
            def __iter__(self):
                return self
            def close(self):
                closed.append(True)
        response = wrappers.BaseResponse(Iterable())
        response.call_on_close(lambda: closed.append(True))
        app_iter, status, headers = run_wsgi_app(response,
                                                 create_environ(),
                                                 buffered=True)
        self.assert_strict_equal(status, '200 OK')
        self.assert_strict_equal(''.join(app_iter), '')
        self.assert_strict_equal(len(closed), 2)

        # with statement
        del closed[:]
        response = wrappers.BaseResponse(Iterable())
        with response:
            pass
        self.assert_equal(len(closed), 1)

    def test_response_status_codes(self):
        response = wrappers.BaseResponse()
        response.status_code = 404
        self.assert_strict_equal(response.status, '404 NOT FOUND')
        response.status = '200 OK'
        self.assert_strict_equal(response.status_code, 200)
        response.status = '999 WTF'
        self.assert_strict_equal(response.status_code, 999)
        response.status_code = 588
        self.assert_strict_equal(response.status_code, 588)
        self.assert_strict_equal(response.status, '588 UNKNOWN')
        response.status = 'wtf'
        self.assert_strict_equal(response.status_code, 0)
        self.assert_strict_equal(response.status, '0 wtf')

    def test_type_forcing(self):
        def wsgi_application(environ, start_response):
            start_response('200 OK', [('Content-Type', 'text/html')])
            return ['Hello World!']
        base_response = wrappers.BaseResponse('Hello World!', content_type='text/html')

        class SpecialResponse(wrappers.Response):
            def foo(self):
                return 42

        # good enough for this simple application, but don't ever use that in
        # real world examples!
        fake_env = {}

        for orig_resp in wsgi_application, base_response:
            response = SpecialResponse.force_type(orig_resp, fake_env)
            assert response.__class__ is SpecialResponse
            self.assert_strict_equal(response.foo(), 42)
            self.assert_strict_equal(response.get_data(), b'Hello World!')
            self.assert_equal(response.content_type, 'text/html')

        # without env, no arbitrary conversion
        self.assert_raises(TypeError, SpecialResponse.force_type, wsgi_application)

    def test_accept_mixin(self):
        request = wrappers.Request({
            'HTTP_ACCEPT':  'text/xml,application/xml,application/xhtml+xml,'
                            'text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5',
            'HTTP_ACCEPT_CHARSET': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7',
            'HTTP_ACCEPT_ENCODING': 'gzip,deflate',
            'HTTP_ACCEPT_LANGUAGE': 'en-us,en;q=0.5'
        })
        self.assert_equal(request.accept_mimetypes, MIMEAccept([
            ('text/xml', 1), ('image/png', 1), ('application/xml', 1),
            ('application/xhtml+xml', 1), ('text/html', 0.9),
            ('text/plain', 0.8), ('*/*', 0.5)
        ]))
        self.assert_strict_equal(request.accept_charsets, CharsetAccept([
            ('ISO-8859-1', 1), ('utf-8', 0.7), ('*', 0.7)
        ]))
        self.assert_strict_equal(request.accept_encodings, Accept([
            ('gzip', 1), ('deflate', 1)]))
        self.assert_strict_equal(request.accept_languages, LanguageAccept([
            ('en-us', 1), ('en', 0.5)]))

        request = wrappers.Request({'HTTP_ACCEPT': ''})
        self.assert_strict_equal(request.accept_mimetypes, MIMEAccept())

    def test_etag_request_mixin(self):
        request = wrappers.Request({
            'HTTP_CACHE_CONTROL':       'no-store, no-cache',
            'HTTP_IF_MATCH':            'w/"foo", bar, "baz"',
            'HTTP_IF_NONE_MATCH':       'w/"foo", bar, "baz"',
            'HTTP_IF_MODIFIED_SINCE':   'Tue, 22 Jan 2008 11:18:44 GMT',
            'HTTP_IF_UNMODIFIED_SINCE': 'Tue, 22 Jan 2008 11:18:44 GMT'
        })
        assert request.cache_control.no_store
        assert request.cache_control.no_cache

        for etags in request.if_match, request.if_none_match:
            assert etags('bar')
            assert etags.contains_raw('w/"foo"')
            assert etags.contains_weak('foo')
            assert not etags.contains('foo')

        self.assert_equal(request.if_modified_since, datetime(2008, 1, 22, 11, 18, 44))
        self.assert_equal(request.if_unmodified_since, datetime(2008, 1, 22, 11, 18, 44))

    def test_user_agent_mixin(self):
        user_agents = [
            ('Mozilla/5.0 (Macintosh; U; Intel Mac OS X; en-US; rv:1.8.1.11) '
             'Gecko/20071127 Firefox/2.0.0.11', 'firefox', 'macos', '2.0.0.11',
             'en-US'),
            ('Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; de-DE) Opera 8.54',
             'opera', 'windows', '8.54', 'de-DE'),
            ('Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en) AppleWebKit/420 '
             '(KHTML, like Gecko) Version/3.0 Mobile/1A543a Safari/419.3',
             'safari', 'iphone', '419.3', 'en'),
            ('Bot Googlebot/2.1 ( http://www.googlebot.com/bot.html)',
             'google', None, '2.1', None)
        ]
        for ua, browser, platform, version, lang in user_agents:
            request = wrappers.Request({'HTTP_USER_AGENT': ua})
            self.assert_strict_equal(request.user_agent.browser, browser)
            self.assert_strict_equal(request.user_agent.platform, platform)
            self.assert_strict_equal(request.user_agent.version, version)
            self.assert_strict_equal(request.user_agent.language, lang)
            assert bool(request.user_agent)
            self.assert_strict_equal(request.user_agent.to_header(), ua)
            self.assert_strict_equal(str(request.user_agent), ua)

        request = wrappers.Request({'HTTP_USER_AGENT': 'foo'})
        assert not request.user_agent

    def test_stream_wrapping(self):
        class LowercasingStream(object):
            def __init__(self, stream):
                self._stream = stream
            def read(self, size=-1):
                return self._stream.read(size).lower()
            def readline(self, size=-1):
                return self._stream.readline(size).lower()

        data = b'foo=Hello+World'
        req = wrappers.Request.from_values('/', method='POST', data=data,
            content_type='application/x-www-form-urlencoded')
        req.stream = LowercasingStream(req.stream)
        self.assert_equal(req.form['foo'], 'hello world')

    def test_etag_response_mixin(self):
        response = wrappers.Response('Hello World')
        self.assert_equal(response.get_etag(), (None, None))
        response.add_etag()
        self.assert_equal(response.get_etag(), ('b10a8db164e0754105b7a99be72e3fe5', False))
        assert not response.cache_control
        response.cache_control.must_revalidate = True
        response.cache_control.max_age = 60
        response.headers['Content-Length'] = len(response.get_data())
        assert response.headers['Cache-Control'] in ('must-revalidate, max-age=60',
                                                     'max-age=60, must-revalidate')

        assert 'date' not in response.headers
        env = create_environ()
        env.update({
            'REQUEST_METHOD':       'GET',
            'HTTP_IF_NONE_MATCH':   response.get_etag()[0]
        })
        response.make_conditional(env)
        assert 'date' in response.headers

        # after the thing is invoked by the server as wsgi application
        # (we're emulating this here), there must not be any entity
        # headers left and the status code would have to be 304
        resp = wrappers.Response.from_app(response, env)
        self.assert_equal(resp.status_code, 304)
        assert not 'content-length' in resp.headers

        # make sure date is not overriden
        response = wrappers.Response('Hello World')
        response.date = 1337
        d = response.date
        response.make_conditional(env)
        self.assert_equal(response.date, d)

        # make sure content length is only set if missing
        response = wrappers.Response('Hello World')
        response.content_length = 999
        response.make_conditional(env)
        self.assert_equal(response.content_length, 999)

    def test_etag_response_mixin_freezing(self):
        class WithFreeze(wrappers.ETagResponseMixin, wrappers.BaseResponse):
            pass
        class WithoutFreeze(wrappers.BaseResponse, wrappers.ETagResponseMixin):
            pass

        response = WithFreeze('Hello World')
        response.freeze()
        self.assert_strict_equal(response.get_etag(),
            (text_type(wrappers.generate_etag(b'Hello World')), False))
        response = WithoutFreeze('Hello World')
        response.freeze()
        self.assert_equal(response.get_etag(), (None, None))
        response = wrappers.Response('Hello World')
        response.freeze()
        self.assert_equal(response.get_etag(), (None, None))

    def test_authenticate_mixin(self):
        resp = wrappers.Response()
        resp.www_authenticate.type = 'basic'
        resp.www_authenticate.realm = 'Testing'
        self.assert_strict_equal(resp.headers['WWW-Authenticate'], u'Basic realm="Testing"')
        resp.www_authenticate.realm = None
        resp.www_authenticate.type = None
        assert 'WWW-Authenticate' not in resp.headers

    def test_response_stream_mixin(self):
        response = wrappers.Response()
        response.stream.write('Hello ')
        response.stream.write('World!')
        self.assert_equal(response.response, ['Hello ', 'World!'])
        self.assert_equal(response.get_data(), b'Hello World!')

    def test_common_response_descriptors_mixin(self):
        response = wrappers.Response()
        response.mimetype = 'text/html'
        self.assert_equal(response.mimetype, 'text/html')
        self.assert_equal(response.content_type, 'text/html; charset=utf-8')
        self.assert_equal(response.mimetype_params, {'charset': 'utf-8'})
        response.mimetype_params['x-foo'] = 'yep'
        del response.mimetype_params['charset']
        self.assert_equal(response.content_type, 'text/html; x-foo=yep')

        now = datetime.utcnow().replace(microsecond=0)

        assert response.content_length is None
        response.content_length = '42'
        self.assert_equal(response.content_length, 42)

        for attr in 'date', 'age', 'expires':
            assert getattr(response, attr) is None
            setattr(response, attr, now)
            self.assert_equal(getattr(response, attr), now)

        assert response.retry_after is None
        response.retry_after = now
        self.assert_equal(response.retry_after, now)

        assert not response.vary
        response.vary.add('Cookie')
        response.vary.add('Content-Language')
        assert 'cookie' in response.vary
        self.assert_equal(response.vary.to_header(), 'Cookie, Content-Language')
        response.headers['Vary'] = 'Content-Encoding'
        self.assert_equal(response.vary.as_set(), set(['content-encoding']))

        response.allow.update(['GET', 'POST'])
        self.assert_equal(response.headers['Allow'], 'GET, POST')

        response.content_language.add('en-US')
        response.content_language.add('fr')
        self.assert_equal(response.headers['Content-Language'], 'en-US, fr')

    def test_common_request_descriptors_mixin(self):
        request = wrappers.Request.from_values(content_type='text/html; charset=utf-8',
                                               content_length='23',
                                               headers={
            'Referer':      'http://www.example.com/',
            'Date':         'Sat, 28 Feb 2009 19:04:35 GMT',
            'Max-Forwards': '10',
            'Pragma':       'no-cache',
            'Content-Encoding': 'gzip',
            'Content-MD5':      '9a3bc6dbc47a70db25b84c6e5867a072'
        })

        self.assert_equal(request.content_type, 'text/html; charset=utf-8')
        self.assert_equal(request.mimetype, 'text/html')
        self.assert_equal(request.mimetype_params, {'charset': 'utf-8'})
        self.assert_equal(request.content_length, 23)
        self.assert_equal(request.referrer, 'http://www.example.com/')
        self.assert_equal(request.date, datetime(2009, 2, 28, 19, 4, 35))
        self.assert_equal(request.max_forwards, 10)
        self.assert_true('no-cache' in request.pragma)
        self.assert_equal(request.content_encoding, 'gzip')
        self.assert_equal(request.content_md5, '9a3bc6dbc47a70db25b84c6e5867a072')

    def test_shallow_mode(self):
        request = wrappers.Request({'QUERY_STRING': 'foo=bar'}, shallow=True)
        self.assert_equal(request.args['foo'], 'bar')
        self.assert_raises(RuntimeError, lambda: request.form['foo'])

    def test_form_parsing_failed(self):
        data = (
            b'--blah\r\n'
        )
        data = wrappers.Request.from_values(input_stream=BytesIO(data),
                                            content_length=len(data),
                                            content_type='multipart/form-data; boundary=foo',
                                            method='POST')
        assert not data.files
        assert not data.form

    def test_file_closing(self):
        data = (b'--foo\r\n'
                b'Content-Disposition: form-data; name="foo"; filename="foo.txt"\r\n'
                b'Content-Type: text/plain; charset=utf-8\r\n\r\n'
                b'file contents, just the contents\r\n'
                b'--foo--')
        req = wrappers.Request.from_values(
            input_stream=BytesIO(data),
            content_length=len(data),
            content_type='multipart/form-data; boundary=foo',
            method='POST'
        )
        foo = req.files['foo']
        self.assert_equal(foo.mimetype, 'text/plain')
        self.assert_equal(foo.filename, 'foo.txt')

        self.assert_equal(foo.closed, False)
        req.close()
        self.assert_equal(foo.closed, True)

    def test_file_closing_with(self):
        data = (b'--foo\r\n'
                b'Content-Disposition: form-data; name="foo"; filename="foo.txt"\r\n'
                b'Content-Type: text/plain; charset=utf-8\r\n\r\n'
                b'file contents, just the contents\r\n'
                b'--foo--')
        req = wrappers.Request.from_values(
            input_stream=BytesIO(data),
            content_length=len(data),
            content_type='multipart/form-data; boundary=foo',
            method='POST'
        )
        with req:
            foo = req.files['foo']
            self.assert_equal(foo.mimetype, 'text/plain')
            self.assert_equal(foo.filename, 'foo.txt')

        self.assert_equal(foo.closed, True)

    def test_url_charset_reflection(self):
        req = wrappers.Request.from_values()
        req.charset = 'utf-7'
        self.assert_equal(req.url_charset, 'utf-7')

    def test_response_streamed(self):
        r = wrappers.Response()
        assert not r.is_streamed
        r = wrappers.Response("Hello World")
        assert not r.is_streamed
        r = wrappers.Response(["foo", "bar"])
        assert not r.is_streamed
        def gen():
            if 0:
                yield None
        r = wrappers.Response(gen())
        assert r.is_streamed

    def test_response_iter_wrapping(self):
        def uppercasing(iterator):
            for item in iterator:
                yield item.upper()
        def generator():
            yield 'foo'
            yield 'bar'
        req = wrappers.Request.from_values()
        resp = wrappers.Response(generator())
        del resp.headers['Content-Length']
        resp.response = uppercasing(resp.iter_encoded())
        actual_resp = wrappers.Response.from_app(resp, req.environ, buffered=True)
        self.assertEqual(actual_resp.get_data(), b'FOOBAR')

    def test_response_freeze(self):
        def generate():
            yield "foo"
            yield "bar"
        resp = wrappers.Response(generate())
        resp.freeze()
        self.assert_equal(resp.response, [b'foo', b'bar'])
        self.assert_equal(resp.headers['content-length'], '6')

    def test_other_method_payload(self):
        data = b'Hello World'
        req = wrappers.Request.from_values(input_stream=BytesIO(data),
                                           content_length=len(data),
                                           content_type='text/plain',
                                           method='WHAT_THE_FUCK')
        self.assert_equal(req.get_data(), data)
        self.assert_is_instance(req.stream, LimitedStream)

    def test_urlfication(self):
        resp = wrappers.Response()
        resp.headers['Location'] = u'http://üser:pässword@☃.net/påth'
        resp.headers['Content-Location'] = u'http://☃.net/'
        headers = resp.get_wsgi_headers(create_environ())
        self.assert_equal(headers['location'], \
            'http://%C3%BCser:p%C3%A4ssword@xn--n3h.net/p%C3%A5th')
        self.assert_equal(headers['content-location'], 'http://xn--n3h.net/')

    def test_new_response_iterator_behavior(self):
        req = wrappers.Request.from_values()
        resp = wrappers.Response(u'Hello Wörld!')

        def get_content_length(resp):
            headers = resp.get_wsgi_headers(req.environ)
            return headers.get('content-length', type=int)

        def generate_items():
            yield "Hello "
            yield u"Wörld!"

        # werkzeug encodes when set to `data` now, which happens
        # if a string is passed to the response object.
        self.assert_equal(resp.response, [u'Hello Wörld!'.encode('utf-8')])
        self.assert_equal(resp.get_data(), u'Hello Wörld!'.encode('utf-8'))
        self.assert_equal(get_content_length(resp), 13)
        assert not resp.is_streamed
        assert resp.is_sequence

        # try the same for manual assignment
        resp.set_data(u'Wörd')
        self.assert_equal(resp.response, [u'Wörd'.encode('utf-8')])
        self.assert_equal(resp.get_data(), u'Wörd'.encode('utf-8'))
        self.assert_equal(get_content_length(resp), 5)
        assert not resp.is_streamed
        assert resp.is_sequence

        # automatic generator sequence conversion
        resp.response = generate_items()
        assert resp.is_streamed
        assert not resp.is_sequence
        self.assert_equal(resp.get_data(), u'Hello Wörld!'.encode('utf-8'))
        self.assert_equal(resp.response, [b'Hello ', u'Wörld!'.encode('utf-8')])
        assert not resp.is_streamed
        assert resp.is_sequence

        # automatic generator sequence conversion
        resp.response = generate_items()
        resp.implicit_sequence_conversion = False
        assert resp.is_streamed
        assert not resp.is_sequence
        self.assert_raises(RuntimeError, lambda: resp.get_data())
        resp.make_sequence()
        self.assert_equal(resp.get_data(), u'Hello Wörld!'.encode('utf-8'))
        self.assert_equal(resp.response, [b'Hello ', u'Wörld!'.encode('utf-8')])
        assert not resp.is_streamed
        assert resp.is_sequence

        # stream makes it a list no matter how the conversion is set
        for val in True, False:
            resp.implicit_sequence_conversion = val
            resp.response = ("foo", "bar")
            assert resp.is_sequence
            resp.stream.write('baz')
            self.assert_equal(resp.response, ['foo', 'bar', 'baz'])

    def test_form_data_ordering(self):
        class MyRequest(wrappers.Request):
            parameter_storage_class = ImmutableOrderedMultiDict

        req = MyRequest.from_values('/?foo=1&bar=0&foo=3')
        self.assert_equal(list(req.args), ['foo', 'bar'])
        self.assert_equal(list(req.args.items(multi=True)), [
            ('foo', '1'),
            ('bar', '0'),
            ('foo', '3')
        ])
        self.assert_is_instance(req.args, ImmutableOrderedMultiDict)
        self.assert_is_instance(req.values, CombinedMultiDict)
        self.assert_equal(req.values['foo'], '1')
        self.assert_equal(req.values.getlist('foo'), ['1', '3'])

    def test_storage_classes(self):
        class MyRequest(wrappers.Request):
            dict_storage_class = dict
            list_storage_class = list
            parameter_storage_class = dict
        req = MyRequest.from_values('/?foo=baz', headers={
            'Cookie':   'foo=bar'
        })
        assert type(req.cookies) is dict
        self.assert_equal(req.cookies, {'foo': 'bar'})
        assert type(req.access_route) is list

        assert type(req.args) is dict
        assert type(req.values) is CombinedMultiDict
        self.assert_equal(req.values['foo'], u'baz')

        req = wrappers.Request.from_values(headers={
            'Cookie':   'foo=bar'
        })
        assert type(req.cookies) is ImmutableTypeConversionDict
        self.assert_equal(req.cookies, {'foo': 'bar'})
        assert type(req.access_route) is ImmutableList

        MyRequest.list_storage_class = tuple
        req = MyRequest.from_values()
        assert type(req.access_route) is tuple

    def test_response_headers_passthrough(self):
        headers = wrappers.Headers()
        resp = wrappers.Response(headers=headers)
        assert resp.headers is headers

    def test_response_304_no_content_length(self):
        resp = wrappers.Response('Test', status=304)
        env = create_environ()
        assert 'content-length' not in resp.get_wsgi_headers(env)

    def test_ranges(self):
        # basic range stuff
        req = wrappers.Request.from_values()
        assert req.range is None
        req = wrappers.Request.from_values(headers={'Range': 'bytes=0-499'})
        self.assert_equal(req.range.ranges, [(0, 500)])

        resp = wrappers.Response()
        resp.content_range = req.range.make_content_range(1000)
        self.assert_equal(resp.content_range.units, 'bytes')
        self.assert_equal(resp.content_range.start, 0)
        self.assert_equal(resp.content_range.stop, 500)
        self.assert_equal(resp.content_range.length, 1000)
        self.assert_equal(resp.headers['Content-Range'], 'bytes 0-499/1000')

        resp.content_range.unset()
        assert 'Content-Range' not in resp.headers

        resp.headers['Content-Range'] = 'bytes 0-499/1000'
        self.assert_equal(resp.content_range.units, 'bytes')
        self.assert_equal(resp.content_range.start, 0)
        self.assert_equal(resp.content_range.stop, 500)
        self.assert_equal(resp.content_range.length, 1000)

    def test_auto_content_length(self):
        resp = wrappers.Response('Hello World!')
        self.assert_equal(resp.content_length, 12)

        resp = wrappers.Response(['Hello World!'])
        assert resp.content_length is None
        self.assert_equal(resp.get_wsgi_headers({})['Content-Length'], '12')

    def test_disabled_auto_content_length(self):
        class MyResponse(wrappers.Response):
            automatically_set_content_length = False
        resp = MyResponse('Hello World!')
        self.assert_is_none(resp.content_length)

        resp = MyResponse(['Hello World!'])
        self.assert_is_none(resp.content_length)
        self.assert_not_in('Content-Length', resp.get_wsgi_headers({}))

    def test_location_header_autocorrect(self):
        env = create_environ()
        class MyResponse(wrappers.Response):
            autocorrect_location_header = False
        resp = MyResponse('Hello World!')
        resp.headers['Location'] = '/test'
        self.assert_equal(resp.get_wsgi_headers(env)['Location'], '/test')

        resp = wrappers.Response('Hello World!')
        resp.headers['Location'] = '/test'
        self.assert_equal(resp.get_wsgi_headers(env)['Location'], 'http://localhost/test')

    def test_modified_url_encoding(self):
        class ModifiedRequest(wrappers.Request):
            url_charset = 'euc-kr'

        req = ModifiedRequest.from_values(u'/?foo=정상처리'.encode('euc-kr'))
        self.assert_strict_equal(req.args['foo'], u'정상처리')

def suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(WrappersTestCase))
    return suite