Skip to content

prefect.cli.concurrency_limit

Command line interface for working with concurrency limits.

create async

Create a concurrency limit against a tag.

This limit controls how many task runs with that tag may simultaneously be in a Running state.

Source code in src/prefect/cli/concurrency_limit.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@concurrency_limit_app.command()
async def create(tag: str, concurrency_limit: int):
    """
    Create a concurrency limit against a tag.

    This limit controls how many task runs with that tag may simultaneously be in a
    Running state.
    """
    import textwrap

    from prefect.client import get_client

    async with get_client() as client:
        await client.create_concurrency_limit(
            tag=tag, concurrency_limit=concurrency_limit
        )
        await client.read_concurrency_limit_by_tag(tag)

    app.console.print(
        textwrap.dedent(
            f"""
            Created concurrency limit with properties:
                tag - {tag!r}
                concurrency_limit - {concurrency_limit}

            Delete the concurrency limit:
                prefect concurrency-limit delete {tag!r}

            Inspect the concurrency limit:
                prefect concurrency-limit inspect {tag!r}
        """
        )
    )

delete async

Delete the concurrency limit set on the specified tag.

Source code in src/prefect/cli/concurrency_limit.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@concurrency_limit_app.command()
async def delete(tag: str):
    """
    Delete the concurrency limit set on the specified tag.
    """
    from prefect.cli._utilities import exit_with_error, exit_with_success
    from prefect.client import get_client
    from prefect.exceptions import ObjectNotFound

    async with get_client() as client:
        try:
            await client.delete_concurrency_limit_by_tag(tag=tag)
        except ObjectNotFound:
            exit_with_error(f"No concurrency limit found for the tag: {tag}")

    exit_with_success(f"Deleted concurrency limit set on the tag: {tag}")

inspect async

View details about a concurrency limit. active_slots shows a list of TaskRun IDs which are currently using a concurrency slot.

Source code in src/prefect/cli/concurrency_limit.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@concurrency_limit_app.command()
async def inspect(tag: str):
    """
    View details about a concurrency limit. `active_slots` shows a list of TaskRun IDs
    which are currently using a concurrency slot.
    """

    from prefect.cli._utilities import exit_with_error
    from prefect.client import get_client
    from prefect.exceptions import ObjectNotFound

    try:
        from rich.console import Group
    except ImportError:
        # Name changed in https://github.com/Textualize/rich/blob/master/CHANGELOG.md#1100---2022-01-09
        from rich.console import RenderGroup as Group

    from rich.panel import Panel
    from rich.pretty import Pretty
    from rich.table import Table

    async with get_client() as client:
        try:
            result = await client.read_concurrency_limit_by_tag(tag=tag)
        except ObjectNotFound:
            exit_with_error(f"No concurrency limit found for the tag: {tag}")

    trid_table = Table()
    trid_table.add_column("Active Task Run IDs", style="cyan", no_wrap=True)

    cl_table = Table(title=f"Concurrency Limit ID: [red]{str(result.id)}")
    cl_table.add_column("Tag", style="green", no_wrap=True)
    cl_table.add_column("Concurrency Limit", style="blue", no_wrap=True)
    cl_table.add_column("Created", style="magenta", no_wrap=True)
    cl_table.add_column("Updated", style="magenta", no_wrap=True)

    for trid in sorted(result.active_slots):
        trid_table.add_row(str(trid))

    cl_table.add_row(
        str(result.tag),
        str(result.concurrency_limit),
        Pretty(pendulum.instance(result.created).diff_for_humans()),
        Pretty(pendulum.instance(result.updated).diff_for_humans()),
    )

    group = Group(
        cl_table,
        trid_table,
    )
    app.console.print(Panel(group, expand=False))

ls async

View all concurrency limits.

Source code in src/prefect/cli/concurrency_limit.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@concurrency_limit_app.command()
async def ls(limit: int = 15, offset: int = 0):
    """
    View all concurrency limits.
    """
    from rich.table import Table

    from prefect.client import get_client

    table = Table(
        title="Concurrency Limits",
        caption="inspect a concurrency limit to show active task run IDs",
    )
    table.add_column("Tag", style="green", no_wrap=True)
    table.add_column("ID", justify="right", style="cyan", no_wrap=True)
    table.add_column("Concurrency Limit", style="blue", no_wrap=True)
    table.add_column("Active Task Runs", style="magenta", no_wrap=True)

    async with get_client() as client:
        concurrency_limits = await client.read_concurrency_limits(
            limit=limit, offset=offset
        )

    for cl in sorted(concurrency_limits, key=lambda c: c.updated, reverse=True):
        table.add_row(
            str(cl.tag),
            str(cl.id),
            str(cl.concurrency_limit),
            str(len(cl.active_slots)),
        )

    app.console.print(table)

reset async

Resets the concurrency limit slots set on the specified tag.

Source code in src/prefect/cli/concurrency_limit.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@concurrency_limit_app.command()
async def reset(tag: str):
    """
    Resets the concurrency limit slots set on the specified tag.
    """
    from prefect.cli._utilities import exit_with_error, exit_with_success
    from prefect.client import get_client
    from prefect.exceptions import ObjectNotFound

    async with get_client() as client:
        try:
            await client.reset_concurrency_limit_by_tag(tag=tag)
        except ObjectNotFound:
            exit_with_error(f"No concurrency limit found for the tag: {tag}")

    exit_with_success(f"Reset concurrency limit set on the tag: {tag}")