Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
osbre committed Jun 8, 2024
1 parent 0986c70 commit 636e0ea
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 27 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,38 @@ pip install streampot
```
## Usage

Here's a simple example that converts a video file to an audio file:

```python
from streampot import StreamPot

client = StreamPot(secret='yourToken')

job = client.input('https://download.samplelib.com/mp4/sample-5s.mp4') \
.output('audio.mp3') \
.run_and_wait()

print(job.outputs['audio.mp3'])
```

If you want to run the job in the background, you can use the `run` method:

```python
job = client.input('https://download.samplelib.com/mp4/sample-5s.mp4') \
.output('audio.mp3') \
.run()
```

And fetch the job info using the `get_job` method:

```python
job = client.get_job(job.id)

print(job.status)
print(job.outputs['audio.mp3']) # output url by file name
print(job.logs) # error logs if any
print(job.created_at)
print(job.completed_at)
```

## Development
Expand Down
54 changes: 27 additions & 27 deletions streampot/_client.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
import requests
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum, auto
from typing import Optional
from enum import Enum
import time


@dataclass
class Asset:
name: str
url: str


class JobStatus(Enum):
PENDING = auto()
COMPLETED = auto()
FAILED = auto()
UPLOADING = auto()
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
UPLOADING = "uploading"


@dataclass
class JobEntity:
id: int
status: JobStatus
created_at: str
assets: Optional[List[Asset]] = None
logs: Optional[str] = None
outputs: Optional[dict] = None
completed_at: Optional[str] = None


def _response_to_job(data: dict) -> JobEntity:
data['status'] = JobStatus[data['status'].upper()]
return JobEntity(**data)


class StreamPotClient:
Expand All @@ -32,19 +33,17 @@ def __init__(self, secret: str, base_url: str = 'https://api.streampot.io/v1'):
self.base_url = base_url
self.actions = []

def get_job(self, job_id: int) -> dict:
def get_job(self, job_id: int) -> JobEntity:
response = requests.get(f"{self.base_url}/jobs/{job_id}", headers=self._auth_header())
response.raise_for_status()
return response.json()

return _response_to_job(response.json())

def run(self) -> JobEntity:
response = requests.post(f"{self.base_url}/", headers=self._auth_header(json=True), json=self.actions)
response = requests.post(f"{self.base_url}/", headers=self._auth_header(), json=self.actions)
response.raise_for_status()

job_data = response.json()
job_data['status'] = JobStatus[job_data['status'].upper()]

return JobEntity(**job_data)
return _response_to_job(response.json())

def run_and_wait(self, interval_ms: int = 1000) -> JobEntity:
job = self.run()
Expand All @@ -53,15 +52,16 @@ def run_and_wait(self, interval_ms: int = 1000) -> JobEntity:
job = self.get_job(job.id)
return job

def _auth_header(self, json: bool = False) -> dict:
headers = {"Authorization": f"Bearer {self.secret}"}
if json:
headers['Accept'] = 'application/json'
headers['Content-Type'] = 'application/json'
return headers
def _auth_header(self) -> dict:
return {
"Authorization": f"Bearer {self.secret}",
"Accept": 'application/json',
"Content-Type": 'application/json'
}

def _add_action(self, name: str, *values):
self.actions.append({"name": name, "value": values})
self.actions.append({"name": name, "value": list(values)})
return self

def merge_add(self, source: str):
return self._add_action('mergeAdd', source)
Expand Down

0 comments on commit 636e0ea

Please sign in to comment.