skit_pipelines.api package

Subpackages

Submodules

skit_pipelines.api.auth module

authenticate_user(username: str, password: str, db={'tog-service@skit.ai': {'hashed_password': '$2b$12$1zrry6JD9ic7i6FNp5s6Beyobujnn7zeFOTyDI6bLQU2adGojlwa.', 'username': 'tog-service@skit.ai'}})[source]
create_access_token(data: dict, expires_delta: Optional[datetime.timedelta] = None)[source]
get_user(db, username: str)[source]
get_users_db()[source]
async valid_user(token: str = Depends(OAuth2PasswordBearer))[source]
verify_password(plain_password, hashed_password)[source]

skit_pipelines.api.endpoints module

class RunPipelineResult(client, run_info)[source]

Bases: object

wait_for_run_completion(timeout=None)[source]
async arguments_validation_exception_handler(request, exc)[source]
async endpoint(req: starlette.requests.Request)[source]
handle_app_mention_events(body, say, logger)[source]

This function is called when the bot (@charon) is called in any slack channel.

Parameters
  • body ([type]) – [description]

  • say ([type]) – [description]

  • _ ([type]) – [description]

health_check()[source]

Get server status health. The purpose of this API is to help other people/machines know liveness of the application.

async kfp_api_exception_handler(request, exc)[source]
async login_for_access_token(form_data: fastapi.security.oauth2.OAuth2PasswordRequestForm = Depends(NoneType))[source]
pipeline_run_req(*, namespace: str, pipeline_name: str, payload: Dict[str, Any], background_tasks: starlette.background.BackgroundTasks, _: str = Depends(valid_user))[source]
run_kfp_pipeline_func(kf_client: kfp._client.Client, pipeline_func: Callable, params: Dict[str, Any], pipeline_name: str, experiment_name: str = 'Default', namespace: str = 'skit') skit_pipelines.api.endpoints.RunPipelineResult[source]
async schedule_run_completion(client_resp: skit_pipelines.api.endpoints.RunPipelineResult, namespace: str, webhook_url: str, slack_channel: Optional[str] = None, slack_thread: Optional[float] = None)[source]

skit_pipelines.api.slack_bot module

authenticate_bot()[source]
command_parser(text: str) Tuple[Optional[str], Optional[str], Optional[Dict[str, Any]]][source]

Parses pipeline run commands.

Note

We assume commands to follow the following template:

@slackbot run <pipeline_name> ` { "param_1": "value", } `

This allows us to parse the pipeline name and arguments conveniently but there are certain pitfalls we need to keep in mind.

  1. We slack markdown auto-format for links. if the value is a link it will be automatically formatted

    by slack and users have no way to remove it in the code-blocks. Either way it leads to poor UX. The link format is either <link> or <link|description>.

  2. KFP doesn’t like arguments other than strings and numbers. So for pipelines that may need dictionaries, we serialize

    the dictionary to JSON and pass it as a string. This is inconvenient because users need to remember escaping strings, the payload quickly becomes unreadable in case of any nesting.

We handle [1] by parsing the markdown and using only the link since that’s the user’s expectation. For [2] we require users to pass indented, neat, dictionaries. The serialization is done here so that pipelines are happy.

Parameters

text (str) – The command text.

Returns

The command, pipeline name, and payload.

Return type

Tuple[str]

decode_payload(encoded_payload: str) Optional[Dict[str, Any]][source]

Decodes a base64 string to JSON

Parameters

encoded_payload (str) – A base64 string.

Returns

decoded JSON.

Return type

PayloadType

encode_payload(payload: Optional[Dict[str, Any]]) str[source]

Encodes a JSON to base64 string

Parameters

payload (PayloadType) – A JSON payload.

Returns

BASE64 encoded payload.

Return type

str

get_message_data(body: Dict[str, Any])[source]

A message from a user to the slack bot.

Removing attributes other than event for brevity, even within event.blocks is redacted. It is helpful for markdown content that users may send.

{
    "event": {
        "client_msg_id": "<UUID>",
        "type": "message",
        "text": "help",
        "user": "[A-Z0-9]+",
        "ts": "1654862590.595759",
        "team": "[A-Z0-9]+",
        "blocks": [{}],
        "channel": "[A-Z0-9]+",
        "event_ts": "1654862590.595759",
        "channel_type": "im"
    }
}
Parameters

body – The message event.

Returns

_description_

Return type

_type_

help_message()[source]
make_response(channel_id, message_ts, text, user)[source]

Makes response for a given command in text.

To schedule a run one can do -

` @charon create_recurring <pipeline_name> <json parameters inside blockquote> `

then it’ll respond with -

` To create a recurring run of <pipeline_name> use: /remind #bots "@charon run <pipeline_name> <encoded parameters>" <In ten minutes/30 May/Every Tuesday/etc> `

copy paste and change the time depending on when pipeline should run, this will trigger slackbot reminders -> make the pipeline run accordingly.

make_run_requests(url_path, payload, access_token)[source]
read_access_token(token_fetch=False) str[source]
recurr_run_format(pipeline_name: str, encoded_payload: str, channel_id: str, message_ts, user)[source]
run_pipeline(pipeline_name, payload, channel_id=None, message_ts=None, user=None, server_port: int = 9991)[source]

skit_pipelines.api.validate_input module

SUPPORTED_LANG_CODES = ['en-US', 'en', 'hi', 'ta', 'te', 'ma', 'gu', 'mr']

Collection of checks for input parameters that are passed to various skit-pipelines. Ideally these should be on the level of pipelines but currently we start with keeping them universal.

To add a check:

1. Add a new function with the validation check (Do ensure that the param is actually present in the payload before accessing the validation condition.) 2. If validation fails, append the error to self.errors 3. Call the validation function from validate_input_params()

class ValidateInput(payload, pipeline_name)[source]

Bases: object

validate_input_params()[source]

Module contents