PATH:
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
import sentry_sdk from sentry_sdk.utils import ( event_from_exception, ensure_integration_enabled, parse_version, ) from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration from sentry_sdk.scope import should_send_default_pii try: import gql # type: ignore[import-not-found] from graphql import ( print_ast, get_operation_ast, DocumentNode, VariableDefinitionNode, ) from gql.transport import Transport, AsyncTransport # type: ignore[import-not-found] from gql.transport.exceptions import TransportQueryError # type: ignore[import-not-found] try: # gql 4.0+ from gql import GraphQLRequest except ImportError: GraphQLRequest = None except ImportError: raise DidNotEnable("gql is not installed") from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any, Dict, Tuple, Union from sentry_sdk._types import Event, EventProcessor EventDataType = Dict[str, Union[str, Tuple[VariableDefinitionNode, ...]]] class GQLIntegration(Integration): identifier = "gql" @staticmethod def setup_once(): # type: () -> None gql_version = parse_version(gql.__version__) _check_minimum_version(GQLIntegration, gql_version) _patch_execute() def _data_from_document(document): # type: (DocumentNode) -> EventDataType try: operation_ast = get_operation_ast(document) data = {"query": print_ast(document)} # type: EventDataType if operation_ast is not None: data["variables"] = operation_ast.variable_definitions if operation_ast.name is not None: data["operationName"] = operation_ast.name.value return data except (AttributeError, TypeError): return dict() def _transport_method(transport): # type: (Union[Transport, AsyncTransport]) -> str """ The RequestsHTTPTransport allows defining the HTTP method; all other transports use POST. """ try: return transport.method except AttributeError: return "POST" def _request_info_from_transport(transport): # type: (Union[Transport, AsyncTransport, None]) -> Dict[str, str] if transport is None: return {} request_info = { "method": _transport_method(transport), } try: request_info["url"] = transport.url except AttributeError: pass return request_info def _patch_execute(): # type: () -> None real_execute = gql.Client.execute @ensure_integration_enabled(GQLIntegration, real_execute) def sentry_patched_execute(self, document_or_request, *args, **kwargs): # type: (gql.Client, DocumentNode, Any, Any) -> Any scope = sentry_sdk.get_isolation_scope() scope.add_event_processor(_make_gql_event_processor(self, document_or_request)) try: return real_execute(self, document_or_request, *args, **kwargs) except TransportQueryError as e: event, hint = event_from_exception( e, client_options=sentry_sdk.get_client().options, mechanism={"type": "gql", "handled": False}, ) sentry_sdk.capture_event(event, hint) raise e gql.Client.execute = sentry_patched_execute def _make_gql_event_processor(client, document_or_request): # type: (gql.Client, Union[DocumentNode, gql.GraphQLRequest]) -> EventProcessor def processor(event, hint): # type: (Event, dict[str, Any]) -> Event try: errors = hint["exc_info"][1].errors except (AttributeError, KeyError): errors = None request = event.setdefault("request", {}) request.update( { "api_target": "graphql", **_request_info_from_transport(client.transport), } ) if should_send_default_pii(): if GraphQLRequest is not None and isinstance( document_or_request, GraphQLRequest ): # In v4.0.0, gql moved to using GraphQLRequest instead of # DocumentNode in execute # https://github.com/graphql-python/gql/pull/556 document = document_or_request.document else: document = document_or_request request["data"] = _data_from_document(document) contexts = event.setdefault("contexts", {}) response = contexts.setdefault("response", {}) response.update( { "data": {"errors": errors}, "type": response, } ) return event return processor
[-] aws_lambda.py
[edit]
[+]
__pycache__
[-] chalice.py
[edit]
[-] anthropic.py
[edit]
[-] unleash.py
[edit]
[-] atexit.py
[edit]
[-] rq.py
[edit]
[-] gnu_backtrace.py
[edit]
[-] langgraph.py
[edit]
[+]
spark
[-] aiohttp.py
[edit]
[-] socket.py
[edit]
[-] sys_exit.py
[edit]
[-] boto3.py
[edit]
[-] sqlalchemy.py
[edit]
[-] asyncpg.py
[edit]
[-] openfeature.py
[edit]
[-] modules.py
[edit]
[-] executing.py
[edit]
[-] wsgi.py
[edit]
[-] dedupe.py
[edit]
[-] __init__.py
[edit]
[-] huey.py
[edit]
[-] fastapi.py
[edit]
[-] stdlib.py
[edit]
[-] cloud_resource_context.py
[edit]
[-] starlette.py
[edit]
[-] flask.py
[edit]
[-] bottle.py
[edit]
[-] starlite.py
[edit]
[-] rust_tracing.py
[edit]
[-] otlp.py
[edit]
[-] statsig.py
[edit]
[-] excepthook.py
[edit]
[-] ariadne.py
[edit]
[+]
pydantic_ai
[+]
..
[-] sanic.py
[edit]
[+]
google_genai
[+]
django
[+]
openai_agents
[-] dramatiq.py
[edit]
[-] langchain.py
[edit]
[-] graphene.py
[edit]
[-] loguru.py
[edit]
[-] clickhouse_driver.py
[edit]
[+]
celery
[-] tornado.py
[edit]
[-] strawberry.py
[edit]
[-] arq.py
[edit]
[-] litellm.py
[edit]
[-] openai.py
[edit]
[-] beam.py
[edit]
[-] launchdarkly.py
[edit]
[+]
grpc
[-] threading.py
[edit]
[+]
opentelemetry
[-] _wsgi_common.py
[edit]
[-] _asgi_common.py
[edit]
[-] argv.py
[edit]
[-] litestar.py
[edit]
[-] httpx.py
[edit]
[-] pymongo.py
[edit]
[-] mcp.py
[edit]
[-] cohere.py
[edit]
[-] pyramid.py
[edit]
[-] unraisablehook.py
[edit]
[-] quart.py
[edit]
[-] falcon.py
[edit]
[-] pure_eval.py
[edit]
[+]
redis
[-] logging.py
[edit]
[-] asgi.py
[edit]
[-] gql.py
[edit]
[-] gcp.py
[edit]
[-] ray.py
[edit]
[-] trytond.py
[edit]
[-] huggingface_hub.py
[edit]
[-] asyncio.py
[edit]
[-] typer.py
[edit]
[-] serverless.py
[edit]