Перейти к содержанию

Сервисы (tinvest_signal_engine.services)

Точки входа console_scripts из pyproject.toml.

ingestor

tinvest_signal_engine.services.ingestor

Инжестор: подписка на MarketDataStream и публикация в топик сырых событий.

detector_service

tinvest_signal_engine.services.detector_service

Сервис детектора: чтение raw-топика, запись сигналов в Postgres и Kafka.

api

tinvest_signal_engine.services.api

HTTP API поверх Postgres: последние сигналы и сводки по типам.

HealthResponse

Bases: BaseModel

Ответ проверки живости процесса (без запроса к Postgres).

Source code in src/tinvest_signal_engine/services/api.py
class HealthResponse(BaseModel):
    """Ответ проверки живости процесса (без запроса к Postgres)."""

    status: str = Field(
        description="Обычно `ok`, если процесс принимает HTTP.",
    )
    runtime: dict[str, Any] = Field(
        default_factory=dict,
        description="Build/runtime fingerprint: app_version, commit_sha, build_time.",
    )

ReadyResponse

Bases: BaseModel

Готовность к трафику: проверка соединения с Postgres.

Source code in src/tinvest_signal_engine/services/api.py
class ReadyResponse(BaseModel):
    """Готовность к трафику: проверка соединения с Postgres."""

    status: str = Field(description="`ready`, если БД отвечает на ping.")
    runtime: dict[str, Any] = Field(default_factory=dict)

RecentSignalsResponse

Bases: BaseModel

Список последних сигналов из таблицы Postgres.

Source code in src/tinvest_signal_engine/services/api.py
class RecentSignalsResponse(BaseModel):
    """Список последних сигналов из таблицы Postgres."""

    items: list[dict[str, Any]] = Field(
        description=(
            "Записи сигналов в том же виде, что возвращает хранилище."
        ),
    )
    count: int = Field(description="Длина списка `items`.")

SignalSummaryRow

Bases: BaseModel

Одна строка агрегированной статистики по типу сигнала.

Source code in src/tinvest_signal_engine/services/api.py
class SignalSummaryRow(BaseModel):
    """Одна строка агрегированной статистики по типу сигнала."""

    signal_type: str = Field(
        description="Имя типа сигнала (`signal_type`).",
    )
    signal_count: int = Field(
        description="Число срабатываний за окно.",
    )

SignalSummaryResponse

Bases: BaseModel

Сводка по типам сигналов за последние minutes минут.

Source code in src/tinvest_signal_engine/services/api.py
class SignalSummaryResponse(BaseModel):
    """Сводка по типам сигналов за последние `minutes` минут."""

    items: list[SignalSummaryRow] = Field(
        description=(
            "Строки сводки, отсортированные по убыванию счётчика."
        ),
    )
    minutes: int = Field(
        description="Размер временного окна запроса в минутах.",
    )

AdminFeedbackIn

Bases: BaseModel

Разметка сигнала в админке.

Source code in src/tinvest_signal_engine/services/api.py
class AdminFeedbackIn(BaseModel):
    """Разметка сигнала в админке."""

    signal_id: str
    label: str = Field(description="useful | noise | unsure")
    note: str = ""

    @field_validator("signal_id")
    @classmethod
    def _signal_uuid(cls, v: str) -> str:
        s = v.strip()
        uuid.UUID(s)
        return s

    @field_validator("label")
    @classmethod
    def _label_ok(cls, v: str) -> str:
        allowed = {"useful", "noise", "unsure"}
        if v.strip() not in allowed:
            raise ValueError(f"label must be one of: {', '.join(sorted(allowed))}")
        return v.strip()

DeliverySimulationIn

Bases: BaseModel

Dry-run delivery settings over recent stored signals.

Source code in src/tinvest_signal_engine/services/api.py
class DeliverySimulationIn(BaseModel):
    """Dry-run delivery settings over recent stored signals."""

    preset: str = Field(
        default="current",
        description="current | conservative | admin_only_rollout",
    )
    type_rules_json: str = Field(default="")
    min_quality: int | None = Field(default=None, ge=0, le=100)
    max_per_hour: int | None = Field(default=None, ge=0, le=1000)
    instrument_cooldown_seconds: int | None = Field(default=None, ge=0, le=86_400)
    minutes: int = Field(default=1440, ge=0, le=10_080)
    limit: int = Field(default=200, ge=1, le=200)

    @field_validator("preset")
    @classmethod
    def _preset_ok(cls, v: str) -> str:
        value = (v or "current").strip().lower()
        allowed = {"current", "conservative", "admin_only_rollout"}
        if value not in allowed:
            raise ValueError(f"preset must be one of: {', '.join(sorted(allowed))}")
        return value

require_admin

require_admin(request: Request, token: Annotated[str | None, Query(description='Значение ADMIN_API_TOKEN')] = None, x_admin_token: Annotated[str | None, Header(alias='X-Admin-Token')] = None) -> None

Проверка токена для JSON-эндпоинтов админки.

Source code in src/tinvest_signal_engine/services/api.py
def require_admin(
    request: Request,
    token: Annotated[str | None, Query(description="Значение ADMIN_API_TOKEN")] = None,
    x_admin_token: Annotated[
        str | None, Header(alias="X-Admin-Token")
    ] = None,
) -> None:
    """Проверка токена для JSON-эндпоинтов админки."""
    settings: RuntimeSettings = request.app.state.settings
    expected = settings.admin_api_token
    if not expected:
        raise HTTPException(
            status_code=503,
            detail="Админ API отключён: задайте переменную окружения ADMIN_API_TOKEN.",
        )
    provided = (token or x_admin_token or "").strip()
    if provided != expected:
        raise HTTPException(
            status_code=401,
            detail=(
                "Неверный или отсутствующий токен: query-параметр `token` "
                "или заголовок `X-Admin-Token`."
            ),
        )

local_notifier

tinvest_signal_engine.services.local_notifier

Потребитель топика сигналов на хосте; показ desktop-уведомлений (Windows).

threshold_cron

tinvest_signal_engine.services.threshold_cron

Пересчёт порогов по истории свечей и запись YAML overrides.

Основной сценарий в compose — вызов :func:run_recalc_once из Dagster (threshold_recalc_job). :func:main оставлен для legacy-сервиса tinvest-threshold-cron (бесконечный цикл).

run_recalc_once

run_recalc_once(settings: RuntimeSettings | None = None) -> None

Один прогон пересчёта порогов (Dagster, тесты, ручной вызов).

Source code in src/tinvest_signal_engine/services/threshold_cron.py
def run_recalc_once(settings: RuntimeSettings | None = None) -> None:
    """Один прогон пересчёта порогов (Dagster, тесты, ручной вызов)."""
    cfg = settings or RuntimeSettings.from_env()
    if not cfg.tinvest_token:
        raise RuntimeError("TINVEST_TOKEN is required")
    _recalculate(cfg)

Вспомогательные модули

logging_utils

tinvest_signal_engine.logging_utils

Единообразная настройка логирования для CLI-сервисов.

desktop_notifications

tinvest_signal_engine.desktop_notifications

Всплывающие уведомления на рабочем столе Windows для локального нотификатора.