> ## Documentation Index
> Fetch the complete documentation index at: https://docs.xquik.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Prefect

> Run scheduled Xquik read workflows in Prefect and hand off writes, monitor replays, and extraction jobs to REST, SDKs, or MCP

<blockquote className="agent-llms-directive">
  For the complete documentation index, see <a href="/llms.txt">llms.txt</a>.
</blockquote>

Use the [`prefect-xquik`](https://github.com/Xquik-dev/prefect-xquik) collection when a Prefect flow needs scheduled X (Twitter) reads: tweet search, tweet lookup, user search, user lookup, user timelines, or regional trends.

The current `0.1.5` release is read-focused. It ships one credentials block, an async Xquik client, and 6 Prefect tasks that return raw Xquik JSON dictionaries for downstream tasks, warehouses, dashboards, or alerts.

## Prerequisites

* [Xquik API key](/quickstart)
* Python 3.10+
* Prefect 3
* Prefect work pool or local runner for scheduled deployments

## Scheduled Workflows

<CardGroup cols={2}>
  <Card title="Social Listening" icon="search">
    Schedule `search_tweets` to collect recent or top posts for a saved query, then pass raw JSON to reporting tasks.
  </Card>

  <Card title="Profile Enrichment" icon="users">
    Use `search_users` and `get_user` to enrich customer, creator, or competitor records with current public profiles.
  </Card>

  <Card title="Timeline Refresh" icon="list">
    Run `get_user_tweets` on a schedule to refresh public timelines with optional replies and parent-tweet context.
  </Card>

  <Card title="Trend Alerts" icon="trending-up">
    Call `get_trends` for worldwide or regional topics before downstream alert, dashboard, or warehouse steps.
  </Card>
</CardGroup>

## Install

PyPI publication is pending. Install the pinned GitHub release wheel:

```bash theme={null}
pip install \
  https://github.com/Xquik-dev/prefect-xquik/releases/download/v0.1.5/prefect_xquik-0.1.5-py3-none-any.whl
```

Register the block type with Prefect:

```bash theme={null}
prefect block register -m prefect_xquik
```

## Credential Block

Store the API key in a Prefect block, not in flow source files. Set `base_url` to the public Xquik REST base URL when creating the block.

```python theme={null}
from prefect_xquik import XquikCredentials

credentials = XquikCredentials(
    api_key="xq_YOUR_KEY_HERE",
    base_url="https://xquik.com/api/v1",
)
credentials.save("xquik", overwrite=True)
```

If you create the block in the Prefect UI, use these values:

<CardGroup cols={2}>
  <Card title="Block Type" icon="blocks">
    `Xquik Credentials`
  </Card>

  <Card title="API Key" icon="key-round">
    Paste an Xquik API key from the dashboard.
  </Card>

  <Card title="Base URL" icon="link">
    `https://xquik.com/api/v1`
  </Card>

  <Card title="API Contract" icon="file-json">
    Keep `2026-04-29` unless you intentionally pin a different public contract.
  </Card>
</CardGroup>

## Collection Shape

<CardGroup cols={2}>
  <Card title="Credentials" icon="key-round">
    `XquikCredentials` stores the API key, base URL, contract header, and timeout.
  </Card>

  <Card title="Client" icon="terminal">
    `XquikClient` uses `httpx.AsyncClient`, sends `x-api-key`, and raises `XquikError` for request or response failures.
  </Card>

  <Card title="Tasks" icon="workflow">
    6 async Prefect tasks cover read workflows for tweets, users, user timelines, and trends.
  </Card>

  <Card title="Returns" icon="file-json">
    Tasks return raw Xquik JSON dictionaries. Shape rows in later Prefect tasks.
  </Card>

  <Card title="Retries" icon="refresh-cw">
    Use Prefect task options for retry policy and delay.
  </Card>

  <Card title="Scope" icon="shield-check">
    Version `0.1.5` is read-only. Use REST, SDKs, or MCP for writes, monitors, webhooks, and extraction jobs.
  </Card>
</CardGroup>

## Tasks

<CardGroup cols={2}>
  <Card title="Search Tweets" icon="search">
    `search_tweets(credentials, query, limit=25, query_type="Latest")` calls `GET /x/tweets/search`.
  </Card>

  <Card title="Get Tweet" icon="message-square">
    `get_tweet(credentials, tweet_id)` calls `GET /x/tweets/{id}`.
  </Card>

  <Card title="Search Users" icon="users">
    `search_users(credentials, query, cursor=None)` calls `GET /x/users/search`.
  </Card>

  <Card title="Get User" icon="user-round">
    `get_user(credentials, user_id)` calls `GET /x/users/{id}`. Usernames may include `@`.
  </Card>

  <Card title="Get user timeline" icon="list">
    `get_user_tweets(credentials, user_id, include_replies=False)` calls `GET /x/users/{id}/tweets`.
  </Card>

  <Card title="Get Trends" icon="trending-up">
    `get_trends(credentials, woeid=1, count=30)` calls `GET /x/trends`.
  </Card>
</CardGroup>

## Copy-Ready Flow

Use this flow when a scheduled job should capture current posts and worldwide trends for a reporting task.

```python theme={null}
from __future__ import annotations

from prefect import flow
from prefect_xquik import XquikCredentials, get_trends, search_tweets


@flow
async def social_signal_flow() -> dict[str, object]:
    credentials = XquikCredentials.load("xquik")

    tweets = await search_tweets(
        credentials,
        query='"prefect" OR "workflow orchestration"',
        query_type="Latest",
        limit=25,
    )
    trends = await get_trends(credentials, woeid=1, count=10)

    return {"tweets": tweets, "trends": trends}


if __name__ == "__main__":
    import asyncio

    asyncio.run(social_signal_flow())
```

Run it locally:

```bash theme={null}
python social_signal_flow.py
```

Create a Prefect deployment when the flow is ready for a schedule:

```bash theme={null}
prefect deploy social_signal_flow.py:social_signal_flow --name xquik-social-signals
```

## Result Handoff

<CardGroup cols={2}>
  <Card title="Tweet Pages" icon="message-square">
    `search_tweets` and `get_user_tweets` return `tweets`, `has_next_page`, and `next_cursor`. Store the cursor with the job checkpoint.
  </Card>

  <Card title="User Pages" icon="users">
    `search_users` returns `users`, `has_next_page`, and `next_cursor`. `get_user` returns one public profile dictionary with fields such as `id`, `username`, `name`, `followers`, `verified`, and `profilePicture`.
  </Card>

  <Card title="Trend Pages" icon="trending-up">
    `get_trends` returns `trends`, `count`, and `woeid`. Trend rows include `name` and can include `description`, `query`, and `rank`.
  </Card>

  <Card title="Downstream Rows" icon="table">
    Normalize raw dictionaries in a follow-up task before writing to Slack, Sheets, a warehouse, or a dashboard. Keep `tweet_rows` with `id`, `text`, `author.username`, `createdAt`, and `url`; `user_rows` with `id`, `username`, `name`, `followers`, `verified`, and `profilePicture`; and `trend_rows` with `name`, `rank`, `query`, and `description`.
  </Card>
</CardGroup>

## REST and MCP Handoff

When a Prefect flow needs writes, monitors, webhooks, event replay, or extraction jobs, keep the `prefect-xquik` task scope focused on reads. Call REST, a generated SDK, or MCP from a follow-up task, then normalize REST camelCase fields to compact snake\_case handoff rows before downstream retries.

<CardGroup cols={2}>
  <Card title="Tweet or Reply Write" icon="send">
    Call `POST /x/tweets` through REST or an SDK. Pass public image or MP4 URLs in `media`; do not send `media_ids`. Store confirmed `tweet_id` and `charged_credits`, or store `write_action_id` and poll `GET /x/write-actions/{id}` before retrying a pending write.
  </Card>

  <Card title="DM Media Handoff" icon="message-circle">
    Upload local media with `POST /x/media`, pass exactly 1 returned `mediaId` as `media_ids` on `POST /x/dm/{userId}`, and store `message_id`, `media_id`, `account`, and `user_id`. Leave `reply_to_message_id` unset.
  </Card>

  <Card title="Stored Event Replay" icon="activity">
    Call `GET /events` with `after` when a flow needs stored monitor replay. Store `event_id`, `monitor_id`, `monitor_type`, `occurred_at`, `has_more`, and `next_cursor`, then pass `next_cursor` as the next `after` value.
  </Card>

  <Card title="Extraction Job" icon="boxes">
    Start or poll extraction jobs through REST, SDKs, or MCP. Store `extraction_id`, `tool_type`, `status`, `has_more`, `next_cursor`, and the export path before loading CSV, JSON, or XLSX rows.
  </Card>
</CardGroup>

## Retry Pattern

Wrap the task with Prefect options when you want retries around transient API, network, or rate-limit failures.

```python theme={null}
from prefect_xquik import search_tweets

search_recent_tweets = search_tweets.with_options(
    name="Search Recent X Posts",
    retries=2,
    retry_delay_seconds=10,
)
```

Then call the configured task inside a flow:

```python theme={null}
page = await search_recent_tweets(
    credentials,
    query="workflow orchestration lang:en",
    query_type="Latest",
    limit=100,
)
```

Respect `Retry-After` for repeated `429` responses. Keep `limit` at or below `200` for bounded tweet search pulls. Pass `next_cursor` with the same `query`, `query_type`, and `limit`.

## Failure Routing

`XquikError` exposes `status_code` and `response_text` when an HTTP response is available. Use them to decide whether the flow should fail fast, pause for account action, or retry later.

<CardGroup cols={2}>
  <Card title="Inspect Status" icon="route">
    Catch `XquikError` and branch on `status_code`. Store `response_text` with the failed run for debugging.
  </Card>

  <Card title="Fix Inputs" icon="circle-x">
    Treat `400` and `404` as non-retryable. Fix the query, ID, username, or cursor before rerunning.
  </Card>

  <Card title="Pause Billing Stops" icon="credit-card">
    Treat `402` as an account action. Check subscription or credits before the next schedule.
  </Card>

  <Card title="Back Off Reads" icon="timer">
    Treat `429`, `500`, `502`, and `503` as retryable with Prefect retry settings and slower schedules.
  </Card>
</CardGroup>

## Pagination Handoff

Xquik returns cursor fields for paginated reads. Store the cursor in your task result or downstream state, then pass it back unchanged on the next run.

```python theme={null}
first_page = await search_tweets(
    credentials,
    query="prefect lang:en",
    query_type="Latest",
    limit=100,
)

next_cursor = first_page.get("next_cursor") or first_page.get("nextCursor")

if next_cursor:
    second_page = await search_tweets(
        credentials,
        query="prefect lang:en",
        query_type="Latest",
        cursor=str(next_cursor),
        limit=100,
    )
```

Do not decode cursors. Treat them as opaque strings. For bounded tweet search pulls, keep `query`, `query_type`, and `limit` unchanged. Only `cursor` changes.

## Production Notes

<CardGroup cols={2}>
  <Card title="Credentials" icon="shield-check">
    Store API keys in Prefect blocks. Do not put API keys in deployment YAML, repository files, or logs.
  </Card>

  <Card title="Billing" icon="credit-card">
    X data reads use credits. Handle `402 no_subscription`, `402 no_credits`, and `402 insufficient_credits` before retrying.
  </Card>

  <Card title="Rate Limits" icon="timer">
    Read endpoints share a 60 per 1s user bucket. Retry only after the `Retry-After` window.
  </Card>

  <Card title="Raw JSON" icon="file-json">
    Shape records in follow-up tasks before writing to Slack, Sheets, a warehouse, or a dashboard.
  </Card>

  <Card title="Read Scope" icon="database">
    Use this collection for reads. Use REST, SDKs, or MCP when a flow needs writes, monitors, webhooks, or extraction jobs.
  </Card>

  <Card title="Version Pin" icon="tag">
    Pin the release wheel in production images until PyPI publication is available.
  </Card>
</CardGroup>

## Next Steps

* Read [Rate Limits](/guides/rate-limits) for retry and backoff guidance.
* Read [Error Handling](/guides/error-handling) for `401`, `402`, `429`, and `5xx` recovery.
* Use [Extraction Workflow](/guides/extraction-workflow) when a job needs durable CSV, JSON, XLSX, Markdown, or PDF exports.
* Use [MCP Tools](/mcp/tools) when an AI agent should decide which Xquik endpoint to call.
