PATH:
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
import json import sentry_sdk from sentry_sdk.consts import OP, SPANSTATUS from sentry_sdk.api import continue_trace, get_baggage, get_traceparent from sentry_sdk.integrations import Integration, DidNotEnable from sentry_sdk.integrations._wsgi_common import request_body_within_bounds from sentry_sdk.tracing import ( BAGGAGE_HEADER_NAME, SENTRY_TRACE_HEADER_NAME, TransactionSource, ) from sentry_sdk.utils import ( AnnotatedValue, capture_internal_exceptions, event_from_exception, ) from typing import TypeVar R = TypeVar("R") try: from dramatiq.broker import Broker from dramatiq.middleware import Middleware, default_middleware from dramatiq.errors import Retry from dramatiq.message import Message except ImportError: raise DidNotEnable("Dramatiq is not installed") from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any, Callable, Dict, Optional, Union from sentry_sdk._types import Event, Hint class DramatiqIntegration(Integration): """ Dramatiq integration for Sentry Please make sure that you call `sentry_sdk.init` *before* initializing your broker, as it monkey patches `Broker.__init__`. This integration was originally developed and maintained by https://github.com/jacobsvante and later donated to the Sentry project. """ identifier = "dramatiq" origin = f"auto.queue.{identifier}" @staticmethod def setup_once(): # type: () -> None _patch_dramatiq_broker() def _patch_dramatiq_broker(): # type: () -> None original_broker__init__ = Broker.__init__ def sentry_patched_broker__init__(self, *args, **kw): # type: (Broker, *Any, **Any) -> None integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) try: middleware = kw.pop("middleware") except KeyError: # Unfortunately Broker and StubBroker allows middleware to be # passed in as positional arguments, whilst RabbitmqBroker and # RedisBroker does not. if len(args) == 1: middleware = args[0] args = [] # type: ignore else: middleware = None if middleware is None: middleware = list(m() for m in default_middleware) else: middleware = list(middleware) if integration is not None: middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)] middleware.insert(0, SentryMiddleware()) kw["middleware"] = middleware original_broker__init__(self, *args, **kw) Broker.__init__ = sentry_patched_broker__init__ class SentryMiddleware(Middleware): # type: ignore[misc] """ A Dramatiq middleware that automatically captures and sends exceptions to Sentry. This is automatically added to every instantiated broker via the DramatiqIntegration. """ SENTRY_HEADERS_NAME = "_sentry_headers" def before_enqueue(self, broker, message, delay): # type: (Broker, Message[R], int) -> None integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) if integration is None: return message.options[self.SENTRY_HEADERS_NAME] = { BAGGAGE_HEADER_NAME: get_baggage(), SENTRY_TRACE_HEADER_NAME: get_traceparent(), } def before_process_message(self, broker, message): # type: (Broker, Message[R]) -> None integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) if integration is None: return message._scope_manager = sentry_sdk.isolation_scope() scope = message._scope_manager.__enter__() scope.clear_breadcrumbs() scope.set_extra("dramatiq_message_id", message.message_id) scope.add_event_processor(_make_message_event_processor(message, integration)) sentry_headers = message.options.get(self.SENTRY_HEADERS_NAME) or {} if "retries" in message.options: # start new trace in case of retrying sentry_headers = {} transaction = continue_trace( sentry_headers, name=message.actor_name, op=OP.QUEUE_TASK_DRAMATIQ, source=TransactionSource.TASK, origin=DramatiqIntegration.origin, ) transaction.set_status(SPANSTATUS.OK) sentry_sdk.start_transaction( transaction, name=message.actor_name, op=OP.QUEUE_TASK_DRAMATIQ, source=TransactionSource.TASK, ) transaction.__enter__() def after_process_message(self, broker, message, *, result=None, exception=None): # type: (Broker, Message[R], Optional[Any], Optional[Exception]) -> None integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) if integration is None: return actor = broker.get_actor(message.actor_name) throws = message.options.get("throws") or actor.options.get("throws") scope_manager = message._scope_manager transaction = sentry_sdk.get_current_scope().transaction if not transaction: return None is_event_capture_required = ( exception is not None and not (throws and isinstance(exception, throws)) and not isinstance(exception, Retry) ) if not is_event_capture_required: # normal transaction finish transaction.__exit__(None, None, None) scope_manager.__exit__(None, None, None) return event, hint = event_from_exception( exception, # type: ignore[arg-type] client_options=sentry_sdk.get_client().options, mechanism={ "type": DramatiqIntegration.identifier, "handled": False, }, ) sentry_sdk.capture_event(event, hint=hint) # transaction error transaction.__exit__(type(exception), exception, None) scope_manager.__exit__(type(exception), exception, None) def _make_message_event_processor(message, integration): # type: (Message[R], DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]] def inner(event, hint): # type: (Event, Hint) -> Optional[Event] with capture_internal_exceptions(): DramatiqMessageExtractor(message).extract_into_event(event) return event return inner class DramatiqMessageExtractor: def __init__(self, message): # type: (Message[R]) -> None self.message_data = dict(message.asdict()) def content_length(self): # type: () -> int return len(json.dumps(self.message_data)) def extract_into_event(self, event): # type: (Event) -> None client = sentry_sdk.get_client() if not client.is_active(): return contexts = event.setdefault("contexts", {}) request_info = contexts.setdefault("dramatiq", {}) request_info["type"] = "dramatiq" data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]] if not request_body_within_bounds(client, self.content_length()): data = AnnotatedValue.removed_because_over_size_limit() else: data = self.message_data request_info["data"] = data
[-] aws_lambda.py
[edit]
[+]
__pycache__
[-] chalice.py
[edit]
[-] anthropic.py
[edit]
[-] unleash.py
[edit]
[-] atexit.py
[edit]
[-] rq.py
[edit]
[-] gnu_backtrace.py
[edit]
[-] langgraph.py
[edit]
[+]
spark
[-] aiohttp.py
[edit]
[-] socket.py
[edit]
[-] sys_exit.py
[edit]
[-] boto3.py
[edit]
[-] sqlalchemy.py
[edit]
[-] asyncpg.py
[edit]
[-] openfeature.py
[edit]
[-] modules.py
[edit]
[-] executing.py
[edit]
[-] wsgi.py
[edit]
[-] dedupe.py
[edit]
[-] __init__.py
[edit]
[-] huey.py
[edit]
[-] fastapi.py
[edit]
[-] stdlib.py
[edit]
[-] cloud_resource_context.py
[edit]
[-] starlette.py
[edit]
[-] flask.py
[edit]
[-] bottle.py
[edit]
[-] starlite.py
[edit]
[-] rust_tracing.py
[edit]
[-] otlp.py
[edit]
[-] statsig.py
[edit]
[-] excepthook.py
[edit]
[-] ariadne.py
[edit]
[+]
pydantic_ai
[+]
..
[-] sanic.py
[edit]
[+]
google_genai
[+]
django
[+]
openai_agents
[-] dramatiq.py
[edit]
[-] langchain.py
[edit]
[-] graphene.py
[edit]
[-] loguru.py
[edit]
[-] clickhouse_driver.py
[edit]
[+]
celery
[-] tornado.py
[edit]
[-] strawberry.py
[edit]
[-] arq.py
[edit]
[-] litellm.py
[edit]
[-] openai.py
[edit]
[-] beam.py
[edit]
[-] launchdarkly.py
[edit]
[+]
grpc
[-] threading.py
[edit]
[+]
opentelemetry
[-] _wsgi_common.py
[edit]
[-] _asgi_common.py
[edit]
[-] argv.py
[edit]
[-] litestar.py
[edit]
[-] httpx.py
[edit]
[-] pymongo.py
[edit]
[-] mcp.py
[edit]
[-] cohere.py
[edit]
[-] pyramid.py
[edit]
[-] unraisablehook.py
[edit]
[-] quart.py
[edit]
[-] falcon.py
[edit]
[-] pure_eval.py
[edit]
[+]
redis
[-] logging.py
[edit]
[-] asgi.py
[edit]
[-] gql.py
[edit]
[-] gcp.py
[edit]
[-] ray.py
[edit]
[-] trytond.py
[edit]
[-] huggingface_hub.py
[edit]
[-] asyncio.py
[edit]
[-] typer.py
[edit]
[-] serverless.py
[edit]