#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vi:ts=4:et
from . import localhost
import pycurl
import unittest
import nose.tools
import select
from . import appmanager
from . import util
setup_module_1, teardown_module_1 = appmanager.setup(('app', 8380))
setup_module_2, teardown_module_2 = appmanager.setup(('app', 8381))
setup_module_3, teardown_module_3 = appmanager.setup(('app', 8382))
def setup_module(mod):
setup_module_1(mod)
setup_module_2(mod)
setup_module_3(mod)
def teardown_module(mod):
teardown_module_3(mod)
teardown_module_2(mod)
teardown_module_1(mod)
class MultiTest(unittest.TestCase):
def test_multi(self):
io1 = util.BytesIO()
io2 = util.BytesIO()
m = pycurl.CurlMulti()
handles = []
c1 = util.DefaultCurl()
c2 = util.DefaultCurl()
c1.setopt(c1.URL, 'http://%s:8380/success' % localhost)
c1.setopt(c1.WRITEFUNCTION, io1.write)
c2.setopt(c2.URL, 'http://%s:8381/success' % localhost)
c2.setopt(c1.WRITEFUNCTION, io2.write)
m.add_handle(c1)
m.add_handle(c2)
handles.append(c1)
handles.append(c2)
num_handles = len(handles)
while num_handles:
while 1:
ret, num_handles = m.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
m.select(1.0)
m.remove_handle(c2)
m.remove_handle(c1)
m.close()
c1.close()
c2.close()
self.assertEqual('success', io1.getvalue().decode())
self.assertEqual('success', io2.getvalue().decode())
def test_multi_select_fdset(self):
c1 = util.DefaultCurl()
c2 = util.DefaultCurl()
c3 = util.DefaultCurl()
c1.setopt(c1.URL, "http://%s:8380/success" % localhost)
c2.setopt(c2.URL, "http://%s:8381/success" % localhost)
c3.setopt(c3.URL, "http://%s:8382/success" % localhost)
c1.body = util.BytesIO()
c2.body = util.BytesIO()
c3.body = util.BytesIO()
c1.setopt(c1.WRITEFUNCTION, c1.body.write)
c2.setopt(c2.WRITEFUNCTION, c2.body.write)
c3.setopt(c3.WRITEFUNCTION, c3.body.write)
m = pycurl.CurlMulti()
m.add_handle(c1)
m.add_handle(c2)
m.add_handle(c3)
# Number of seconds to wait for a timeout to happen
SELECT_TIMEOUT = 0.1
# Stir the state machine into action
while 1:
ret, num_handles = m.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
# Keep going until all the connections have terminated
while num_handles:
select.select(*m.fdset() + (SELECT_TIMEOUT,))
while 1:
ret, num_handles = m.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
# Cleanup
m.remove_handle(c3)
m.remove_handle(c2)
m.remove_handle(c1)
m.close()
c1.close()
c2.close()
c3.close()
self.assertEqual('success', c1.body.getvalue().decode())
self.assertEqual('success', c2.body.getvalue().decode())
self.assertEqual('success', c3.body.getvalue().decode())
def test_multi_status_codes(self):
# init
m = pycurl.CurlMulti()
m.handles = []
urls = [
'http://%s:8380/success' % localhost,
'http://%s:8381/status/403' % localhost,
'http://%s:8382/status/404' % localhost,
]
for url in urls:
c = util.DefaultCurl()
# save info in standard Python attributes
c.url = url.rstrip()
c.body = util.BytesIO()
c.http_code = -1
m.handles.append(c)
# pycurl API calls
c.setopt(c.URL, c.url)
c.setopt(c.WRITEFUNCTION, c.body.write)
m.add_handle(c)
# get data
num_handles = len(m.handles)
while num_handles:
while 1:
ret, num_handles = m.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
# currently no more I/O is pending, could do something in the meantime
# (display a progress bar, etc.)
m.select(0.1)
# close handles
for c in m.handles:
# save info in standard Python attributes
c.http_code = c.getinfo(c.HTTP_CODE)
# pycurl API calls
m.remove_handle(c)
c.close()
m.close()
# check result
self.assertEqual('success', m.handles[0].body.getvalue().decode())
self.assertEqual(200, m.handles[0].http_code)
# bottle generated response body
self.assertEqual('forbidden', m.handles[1].body.getvalue().decode())
self.assertEqual(403, m.handles[1].http_code)
# bottle generated response body
self.assertEqual('not found', m.handles[2].body.getvalue().decode())
self.assertEqual(404, m.handles[2].http_code)
def check_adding_closed_handle(self, close_fn):
# init
m = pycurl.CurlMulti()
m.handles = []
urls = [
'http://%s:8380/success' % localhost,
'http://%s:8381/status/403' % localhost,
'http://%s:8382/status/404' % localhost,
]
for url in urls:
c = util.DefaultCurl()
# save info in standard Python attributes
c.url = url
c.body = util.BytesIO()
c.http_code = -1
c.debug = 0
m.handles.append(c)
# pycurl API calls
c.setopt(c.URL, c.url)
c.setopt(c.WRITEFUNCTION, c.body.write)
m.add_handle(c)
# debug - close a handle
c = m.handles[2]
c.debug = 1
c.close()
# get data
num_handles = len(m.handles)
while num_handles:
while 1:
ret, num_handles = m.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
# currently no more I/O is pending, could do something in the meantime
# (display a progress bar, etc.)
m.select(0.1)
# close handles
for c in m.handles:
# save info in standard Python attributes
try:
c.http_code = c.getinfo(c.HTTP_CODE)
except pycurl.error:
# handle already closed - see debug above
assert c.debug
c.http_code = -1
# pycurl API calls
close_fn(m, c)
m.close()
# check result
self.assertEqual('success', m.handles[0].body.getvalue().decode())
self.assertEqual(200, m.handles[0].http_code)
# bottle generated response body
self.assertEqual('forbidden', m.handles[1].body.getvalue().decode())
self.assertEqual(403, m.handles[1].http_code)
# bottle generated response body
self.assertEqual('', m.handles[2].body.getvalue().decode())
self.assertEqual(-1, m.handles[2].http_code)
def _remove_then_close(self, m, c):
m.remove_handle(c)
c.close()
def _close_then_remove(self, m, c):
# in the C API this is the wrong calling order, but pycurl
# handles this automatically
c.close()
m.remove_handle(c)
def _close_without_removing(self, m, c):
# actually, remove_handle is called automatically on close
c.close
def test_adding_closed_handle_remove_then_close(self):
self.check_adding_closed_handle(self._remove_then_close)
def test_adding_closed_handle_close_then_remove(self):
self.check_adding_closed_handle(self._close_then_remove)
def test_adding_closed_handle_close_without_removing(self):
self.check_adding_closed_handle(self._close_without_removing)
def test_multi_select(self):
c1 = util.DefaultCurl()
c2 = util.DefaultCurl()
c3 = util.DefaultCurl()
c1.setopt(c1.URL, "http://%s:8380/success" % localhost)
c2.setopt(c2.URL, "http://%s:8381/success" % localhost)
c3.setopt(c3.URL, "http://%s:8382/success" % localhost)
c1.body = util.BytesIO()
c2.body = util.BytesIO()
c3.body = util.BytesIO()
c1.setopt(c1.WRITEFUNCTION, c1.body.write)
c2.setopt(c2.WRITEFUNCTION, c2.body.write)
c3.setopt(c3.WRITEFUNCTION, c3.body.write)
m = pycurl.CurlMulti()
m.add_handle(c1)
m.add_handle(c2)
m.add_handle(c3)
# Number of seconds to wait for a timeout to happen
SELECT_TIMEOUT = 1.0
# Stir the state machine into action
while 1:
ret, num_handles = m.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
# Keep going until all the connections have terminated
while num_handles:
# The select method uses fdset internally to determine which file descriptors
# to check.
m.select(SELECT_TIMEOUT)
while 1:
ret, num_handles = m.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
# Cleanup
m.remove_handle(c3)
m.remove_handle(c2)
m.remove_handle(c1)
m.close()
c1.close()
c2.close()
c3.close()
self.assertEqual('success', c1.body.getvalue().decode())
self.assertEqual('success', c2.body.getvalue().decode())
self.assertEqual('success', c3.body.getvalue().decode())
def test_multi_info_read(self):
c1 = util.DefaultCurl()
c2 = util.DefaultCurl()
c3 = util.DefaultCurl()
c1.setopt(c1.URL, "http://%s:8380/short_wait" % localhost)
c2.setopt(c2.URL, "http://%s:8381/short_wait" % localhost)
c3.setopt(c3.URL, "http://%s:8382/short_wait" % localhost)
c1.body = util.BytesIO()
c2.body = util.BytesIO()
c3.body = util.BytesIO()
c1.setopt(c1.WRITEFUNCTION, c1.body.write)
c2.setopt(c2.WRITEFUNCTION, c2.body.write)
c3.setopt(c3.WRITEFUNCTION, c3.body.write)
m = pycurl.CurlMulti()
m.add_handle(c1)
m.add_handle(c2)
m.add_handle(c3)
# Number of seconds to wait for a timeout to happen
SELECT_TIMEOUT = 1.0
# Stir the state machine into action
while 1:
ret, num_handles = m.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
infos = []
# Keep going until all the connections have terminated
while num_handles:
# The select method uses fdset internally to determine which file descriptors
# to check.
m.select(SELECT_TIMEOUT)
while 1:
ret, num_handles = m.perform()
info = m.info_read()
infos.append(info)
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
all_handles = []
for info in infos:
handles = info[1]
# last info is an empty array
if handles:
all_handles.extend(handles)
self.assertEqual(3, len(all_handles))
assert c1 in all_handles
assert c2 in all_handles
assert c3 in all_handles
# Cleanup
m.remove_handle(c3)
m.remove_handle(c2)
m.remove_handle(c1)
m.close()
c1.close()
c2.close()
c3.close()
self.assertEqual('success', c1.body.getvalue().decode())
self.assertEqual('success', c2.body.getvalue().decode())
self.assertEqual('success', c3.body.getvalue().decode())
def test_multi_close(self):
m = pycurl.CurlMulti()
m.close()
def test_multi_close_twice(self):
m = pycurl.CurlMulti()
m.close()
m.close()
# positional arguments are rejected
@nose.tools.raises(TypeError)
def test_positional_arguments(self):
pycurl.CurlMulti(1)
# keyword arguments are rejected
@nose.tools.raises(TypeError)
def test_keyword_arguments(self):
pycurl.CurlMulti(a=1)