Skip to content

Commit

Permalink
Merge pull request #416 from rollbar/added-thread-pool
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmorell authored Nov 26, 2022
2 parents f95076d + e14af00 commit 55d08a6
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 25 deletions.
44 changes: 22 additions & 22 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,28 @@ jobs:
- name: Install dependencies
run: pip install setuptools==39.2.0 --force-reinstall

- name: Install Python 2 dependencies
if: ${{ contains(matrix.python-version, '2.7') }}
# certifi dropped support for Python 2 in 2020.4.5.2 but only started
# using Python 3 syntax in 2022.5.18. 2021.10.8 is the last release with
# Python 2 support.
run: pip install certifi==2021.10.8 requests==2.27.1 incremental==21.3.0

- name: Install Python 3.4 dependencies
if: ${{ contains(matrix.python-version, '3.4') }}
# certifi uses the 'typing' from Python 3.5 module starting in 2022.5.18
run: pip install certifi==2021.10.8 "typing-extensions<4" incremental==21.3.0

- name: Install Python 3.5 dependencies
if: ${{ contains(matrix.python-version, '3.5') }}
# typing-extensions dropped support for Python 3.5 in version 4
run: pip install "typing-extensions<4"

- name: Install Python 3.6 dependencies
if: ${{ contains(matrix.python-version, '3.6') }}
# typing-extensions dropped support for Python 3.6 in version 4.2
run: pip install "typing-extensions<4.2"

- name: Set the framework
run: echo ${{ matrix.framework }} >> $GITHUB_ENV

Expand Down Expand Up @@ -232,27 +254,5 @@ jobs:
if: ${{ contains(matrix.framework, 'FASTAPI_VERSION') }}
run: pip install fastapi==$FASTAPI_VERSION

- name: Install Python 2 dependencies
if: ${{ contains(matrix.python-version, '2.7') }}
# certifi dropped support for Python 2 in 2020.4.5.2 but only started
# using Python 3 syntax in 2022.5.18. 2021.10.8 is the last release with
# Python 2 support.
run: pip install certifi==2021.10.8 requests==2.27.1

- name: Install Python 3.4 dependencies
if: ${{ contains(matrix.python-version, '3.4') }}
# certifi uses the 'typing' from Python 3.5 module starting in 2022.5.18
run: pip install certifi==2021.10.8 "typing-extensions<4"

- name: Install Python 3.5 dependencies
if: ${{ contains(matrix.python-version, '3.5') }}
# typing-extensions dropped support for Python 3.5 in version 4
run: pip install "typing-extensions<4"

- name: Install Python 3.6 dependencies
if: ${{ contains(matrix.python-version, '3.6') }}
# typing-extensions dropped support for Python 3.6 in version 4.2
run: pip install "typing-extensions<4.2"

- name: Run tests
run: python setup.py test
29 changes: 26 additions & 3 deletions rollbar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from rollbar.lib import events, filters, dict_merge, parse_qs, text, transport, urljoin, iteritems, defaultJSONEncode


__version__ = '0.16.3'
__version__ = '0.16.4beta'
__log_name__ = 'rollbar'
log = logging.getLogger(__log_name__)

Expand Down Expand Up @@ -124,7 +124,7 @@ def wrap(*args, **kwargs):
from twisted.internet.ssl import CertificateOptions
from twisted.internet import task, defer, ssl, reactor
from zope.interface import implementer

@implementer(IPolicyForHTTPS)
class VerifyHTTPS(object):
def __init__(self):
Expand Down Expand Up @@ -275,7 +275,12 @@ def _get_fastapi_request():
'root': None, # root path to your code
'branch': None, # git branch name
'code_version': None,
'handler': 'default', # 'blocking', 'thread' (default), 'async', 'agent', 'tornado', 'gae', 'twisted' or 'httpx'
# 'blocking', 'thread' (default), 'async', 'agent', 'tornado', 'gae', 'twisted', 'httpx' or 'thread_pool'
# 'async' requires Python 3.4 or higher.
# 'httpx' requires Python 3.7 or higher.
# 'thread_pool' requires Python 3.2 or higher.
'handler': 'default',
'thread_pool_workers': None,
'endpoint': DEFAULT_ENDPOINT,
'timeout': DEFAULT_TIMEOUT,
'agent.log_file': 'log.rollbar',
Expand Down Expand Up @@ -383,6 +388,9 @@ def init(access_token, environment='production', scrub_fields=None, url_fields=N

if SETTINGS.get('handler') == 'agent':
agent_log = _create_agent_log()
elif SETTINGS.get('handler') == 'thread_pool':
from rollbar.lib.thread_pool import init_pool
init_pool(SETTINGS.get('thread_pool_workers', None))

if not SETTINGS['locals']['safelisted_types'] and SETTINGS['locals']['whitelisted_types']:
warnings.warn('whitelisted_types deprecated use safelisted_types instead', DeprecationWarning)
Expand Down Expand Up @@ -523,6 +531,7 @@ def send_payload(payload, access_token):
- 'gae': calls _send_payload_appengine() (which makes a blocking call to Google App Engine)
- 'twisted': calls _send_payload_twisted() (which makes an async HTTP request using Twisted and Treq)
- 'httpx': calls _send_payload_httpx() (which makes an async HTTP request using HTTPX)
- 'thread_pool': uses a pool of worker threads to make HTTP requests off the main thread. Returns immediately.
"""
payload = events.on_payload(payload)
if payload is False:
Expand Down Expand Up @@ -569,6 +578,8 @@ def send_payload(payload, access_token):
_send_payload_async(payload_str, access_token)
elif handler == 'thread':
_send_payload_thread(payload_str, access_token)
elif handler == 'thread_pool':
_send_payload_thread_pool(payload_str, access_token)
else:
# default to 'thread'
_send_payload_thread(payload_str, access_token)
Expand Down Expand Up @@ -1510,6 +1521,18 @@ def _send_payload_thread(payload_str, access_token):
thread.start()


def _send_payload_pool(payload_str, access_token):
try:
_post_api('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


def _send_payload_thread_pool(payload_str, access_token):
from rollbar.lib.thread_pool import submit
submit(_send_payload_pool, payload_str, access_token)


def _send_payload_appengine(payload_str, access_token):
try:
_post_api_appengine('item/', payload_str, access_token=access_token)
Expand Down
38 changes: 38 additions & 0 deletions rollbar/lib/thread_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging
import os
import sys
from concurrent.futures import ThreadPoolExecutor

_pool = None # type: ThreadPoolExecutor|None

log = logging.getLogger(__name__)


def init_pool(max_workers):
"""
Creates the thread pool with the max workers.
:type max_workers: int|None
:param max_workers: If max_workers is None it will use the logic from the standard library to calculate the number
of threads. However, we ported the logic from Python 3.5 to earlier versions.
"""
if max_workers is None and sys.version_info < (3, 5):
max_workers = (os.cpu_count() or 1) * 5

global _pool
_pool = ThreadPoolExecutor(max_workers)


def submit(worker, payload_str, access_token):
"""
Submit a new task to the thread pool.
:type worker: function
:type payload_str: str
:type access_token: str
"""
global _pool
if _pool is None:
log.warning('pyrollbar: Thead pool not initialized. Please ensure init_pool() is called prior to submit().')
return
_pool.submit(worker, payload_str, access_token)
26 changes: 26 additions & 0 deletions rollbar/test/test_rollbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,32 @@ def _raise():

send_payload_httpx.assert_called_once()

@unittest.skipUnless(sys.version_info >= (3, 6), 'assert_called_once support requires Python3.6+')
@mock.patch('rollbar._send_payload_thread_pool')
def test_thread_pool_handler(self, send_payload_thread_pool):
def _raise():
try:
raise Exception('foo')
except:
rollbar.report_exc_info()
rollbar.SETTINGS['handler'] = 'thread_pool'
_raise()

send_payload_thread_pool.assert_called_once()

@unittest.skipUnless(sys.version_info >= (3, 2), 'concurrent.futures support requires Python3.2+')
def test_thread_pool_submit(self):
from rollbar.lib.thread_pool import init_pool, submit
init_pool(1)
ran = {'nope': True} # dict used so it is not shadowed in run

def run(payload_str, access_token):
ran['nope'] = False

submit(run, 'foo', 'bar')
self.assertFalse(ran['nope'])


@mock.patch('rollbar.send_payload')
def test_args_constructor(self, send_payload):

Expand Down

0 comments on commit 55d08a6

Please sign in to comment.