youtube_search.search

Module to search videos on youtuve

  1"""
  2Module to search videos on youtuve
  3"""
  4#  pylint: disable=line-too-long, too-many-instance-attributes, too-many-arguments
  5
  6__all__ = ["encode_url", "YoutubeSearch", "AsyncYoutubeSearch"]
  7
  8import asyncio
  9import re
 10from typing import Iterator, List, Optional, Union
 11from unicodedata import normalize as unicode_normalize
 12import aiohttp
 13import requests
 14from .options import Options
 15
 16BASE_URL = "https://www.youtube.com"
 17
 18
 19def encode_url(url: str) -> str:
 20    """
 21    Encode url and replace space to '+'
 22
 23    Parameters
 24    ----------
 25    url: str
 26        URL
 27
 28    Returns
 29    -------
 30    str
 31    """
 32    return requests.utils.quote(url).replace("%20", "+")
 33
 34
 35class BaseYoutubeSearch:
 36    """
 37    Base class for YoutubeSearch
 38    """
 39
 40    def __init__(
 41        self,
 42        max_results: Optional[int] = None,
 43        options: Options = Options(),
 44    ):
 45        """
 46        Parameters
 47        ----------
 48        max_results : Optional[int], default 20
 49            The maximum result that will be returned. Set to None to remove the limit
 50        options : Options
 51            youtube_search options
 52        """
 53        if max_results is not None and max_results < 0:
 54            raise ValueError(
 55                "Max result must be a whole number or set to None to remove the limit"
 56            )
 57        self.json = options.json_parser
 58        self.max_results = max_results
 59        self._api_key = None
 60        self._cookies = {
 61            "PREF": f"hl={options.language}&gl={options.region}",
 62            "domain": ".youtube.com",
 63        }
 64        self._data = {}
 65        self._videos = []
 66
 67    def _get_video(self, response: Union[str, dict]) -> None:
 68        """
 69        Get video from parsed html
 70
 71        Parameters
 72        ----------
 73        response: Union[str, dict]
 74            Passed to self.__parse_html function
 75        """
 76        for contents in self._parse_html(response):
 77            if "itemSectionRenderer" not in contents:
 78                continue
 79            for video in contents.get("itemSectionRenderer", {}).get("contents", {}):
 80                if self.max_results is not None and self.count >= self.max_results:
 81                    return
 82                res = {}
 83                if "videoRenderer" not in video:
 84                    continue
 85                video_data = video.get("videoRenderer", {})
 86                owner_url_suffix = (
 87                    video_data.get("ownerText", {})
 88                    .get("runs", [{}])[0]
 89                    .get("navigationEndpoint", {})
 90                    .get("browseEndpoint", {})
 91                    .get("canonicalBaseUrl")
 92                )
 93                res["id"] = video_data.get("videoId", None)
 94                res["thumbnails"] = [
 95                    thumb.get("url", None)
 96                    for thumb in video_data.get("thumbnail", {}).get("thumbnails", [{}])
 97                ]
 98                res["title"] = (
 99                    video_data.get("title", {}).get("runs", [[{}]])[0].get("text", None)
100                )
101                res["desc_snippet"] = unicode_normalize(
102                    "NFKD",
103                    "".join(
104                        [
105                            item.get("text", "")
106                            for item in video_data.get(
107                                "detailedMetadataSnippets", [{}]
108                            )[0]
109                            .get("snippetText", {})
110                            .get("runs", [{}])
111                        ]
112                    ),
113                )
114                res["channel"] = (
115                    video_data.get("longBylineText", {})
116                    .get("runs", [[{}]])[0]
117                    .get("text", None)
118                )
119                res["duration"] = video_data.get("lengthText", {}).get("simpleText", 0)
120                res["views"] = video_data.get("viewCountText", {}).get("simpleText", 0)
121                res["publish_time"] = video_data.get("publishedTimeText", {}).get(
122                    "simpleText", 0
123                )
124                res["url_suffix"] = (
125                    video_data.get("navigationEndpoint", {})
126                    .get("commandMetadata", {})
127                    .get("webCommandMetadata", {})
128                    .get("url", None)
129                )
130                res["owner_url"] = f"{BASE_URL}{owner_url_suffix}"
131                res["owner_name"] = (
132                    video_data.get("ownerText", {}).get("runs", [{}])[0].get("text")
133                )
134                self._videos.append(res)
135
136    def _parse_html(self, response: Union[str, dict]) -> Iterator[list]:
137        """
138        Parse the html response to get the videos
139
140        Parameters
141        ----------
142        response: Union[str, dict]
143            The response body
144
145        Returns
146        -------
147        Iterator[list]
148            Contains list of video data
149        """
150        if self._api_key:
151            return (
152                response.get("onResponseReceivedCommands", [{}])[0]
153                .get("appendContinuationItemsAction", {})
154                .get("continuationItems", [])
155            )
156
157        start = response.index("ytInitialData") + len("ytInitialData") + 3
158        end = response.index("};", start) + 1
159        json_str = response[start:end]
160        data = self.json.loads(json_str)
161        self._api_key = re.search(
162            r"(?:\"INNERTUBE_API_KEY\":\")(?P<api_key>[A-Za-z0-9_-]+)(?:\",)",
163            response,
164        )["api_key"]
165        self._data["context"] = self.json.loads(
166            re.search(
167                r"(?:\"INNERTUBE_CONTEXT\"\:)(?P<context>\{(.*)\})(?:,\"INNERTUBE_CONTEXT_CLIENT_NAME\")",
168                response,
169                re.DOTALL,
170            )["context"]
171        )
172        self._data["continuation"] = re.search(
173            r"(?:\"continuationCommand\":{\"token\":\")(?P<token>.+)(?:\",\"request\")",
174            response,
175        )["token"]
176        return data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"][
177            "sectionListRenderer"
178        ]["contents"]
179
180    @property
181    def count(self) -> int:
182        """
183        Returns
184        -------
185        int
186            How many video are in the list
187        """
188        return len(self._videos)
189
190    def list(self, clear_cache: bool = True) -> List[dict]:
191        """
192        Return the list of videos
193
194        Parameters
195        ----------
196        clear_cache: bool, default True
197            Clear the result cache
198
199        Return
200        ------
201        List[dict]:
202            The list of videos
203        """
204        result = self._videos.copy()
205        if clear_cache:
206            self._videos.clear()
207        return result
208
209
210class YoutubeSearch(BaseYoutubeSearch):
211    """
212    Entry point class for youtube searching
213    """
214
215    def __init__(
216        self,
217        max_results: Optional[int] = None,
218        options: Options = Options(),
219        session: Optional[requests.Session] = None,
220    ):
221        """
222        Parameters
223        ----------
224        max_results : Optional[int], default 20
225            The maximum result that will be returned. Set to None to remove the limit
226        options : Options
227            youtube_search options
228        session : Optional[requests.Session], default None
229            Requests session
230        """
231        super().__init__(max_results, options)
232        requests.models.complexjson = self.json
233        self._requests_kwargs = {"timeout": options.timeout, "proxies": options.proxy}
234        self.__session = requests.Session() if session is None else session
235        self.__is_custom_session = bool(session)
236
237    def __enter__(self) -> "YoutubeSearch":
238        return self
239
240    def __exit__(self, *args) -> None:
241        self.close()
242
243    def __search(self, query: str, first: bool = False):
244        """
245        Search wrapper
246
247        Parameters
248        ----------
249        query: str
250            Search query
251        first: bool, default False
252            Is the first time search the query
253        """
254        if first:
255            url = f"{BASE_URL}/results?search_query={encode_url(query)}"
256            resp = self.__session.get(
257                url, cookies=self._cookies, **self._requests_kwargs
258            )
259            resp.raise_for_status()
260            body = resp.text
261            self._get_video(body)
262            return
263        url = f"{BASE_URL}/youtubei/v1/search?{self._api_key}&prettyPrint=false"
264        resp = self.__session.post(
265            url,
266            cookies=self._cookies,
267            data=self.json.dumps(self._data),
268            **self._requests_kwargs,
269        )
270        resp.raise_for_status()
271        body = resp.json()
272        self._get_video(body)
273
274    @property
275    def is_custom_session(self) -> bool:
276        """
277        Returns
278        -------
279        bool
280            Return True if user using custom session
281        """
282        return self.__is_custom_session
283
284    def close(self) -> None:
285        """
286        Close the context manager
287        """
288        self._api_key = None
289        self._data.clear()
290        self._videos.clear()
291        if not self.is_custom_session:
292            self.__session.close()
293
294    def search(self, query: str = None, pages: int = 1) -> "YoutubeSearch":
295        """
296        Parameters
297        ----------
298        query : str
299            Search query
300        pages : str
301            How many page you wanna scroll
302
303        Returns
304        -------
305        self
306            YoutubeSearch object
307        """
308        self._videos.clear()
309        if query:
310            self._api_key = None
311            self._data.clear()
312        if self._api_key is None and not query:
313            raise ValueError("Last search query not found!")
314        for i in range(pages):
315            if i == 0 and query:
316                self.__search(query, True)
317                continue
318            self.__search(query)
319        return self
320
321
322class AsyncYoutubeSearch(BaseYoutubeSearch):
323    """
324    Entry point class for youtube searching
325    """
326
327    def __init__(
328        self,
329        max_results: Optional[int] = None,
330        options: Options = Options(),
331        session: Optional[aiohttp.ClientSession] = None,
332    ):
333        """
334        Parameters
335        ----------
336        max_results : Optional[int], default 20
337            The maximum result that will be returned. Set to None to remove the limit
338        options : Options
339            youtube_search options
340        session : Optional[aiohttp.ClientSession], default None
341            aiohttp client session
342        """
343        super().__init__(max_results, options)
344        if "domain" in self._cookies:
345            self._cookies.pop("domain")
346        self._requests_kwargs = {"timeout": options.timeout}
347        if isinstance(options.proxy, dict):
348            self._requests_kwargs["proxy"] = options.proxy.get("https", "")
349        self.__session = aiohttp.ClientSession() if session is None else session
350        self.__is_custom_session = bool(session)
351
352    async def __aenter__(self) -> "AsyncYoutubeSearch":
353        return self
354
355    async def __aexit__(self, *args) -> None:
356        await self.close()
357
358    async def __search(self, query: str, first: bool = False):
359        """
360        Search wrapper
361
362        Parameters
363        ----------
364        query: str
365            Search query
366        first: bool, default False
367            Is the first time search the query
368        """
369        if first:
370            url = f"{BASE_URL}/results?search_query={encode_url(query)}"
371            async with self.__session.get(
372                url, cookies=self._cookies, **self._requests_kwargs
373            ) as resp:
374                resp.raise_for_status()
375                body = await resp.text()
376            self._get_video(body)
377            return
378        url = f"{BASE_URL}/youtubei/v1/search?{self._api_key}&prettyPrint=false"
379        async with self.__session.post(
380            url,
381            cookies=self._cookies,
382            data=self.json.dumps(self._data),
383            headers={"content-type": "application/json"},
384            **self._requests_kwargs,
385        ) as resp:
386            resp.raise_for_status()
387            body = await resp.json(loads=self.json.loads)
388        self._get_video(body)
389
390    @property
391    def is_custom_session(self) -> bool:
392        """
393        Returns
394        -------
395        bool
396            Return True if user using custom session
397        """
398        return self.__is_custom_session
399
400    async def close(self) -> None:
401        """
402        Close the context manager
403        """
404        self._api_key = None
405        self._data.clear()
406        self._videos.clear()
407        if not self.is_custom_session:
408            await self.__session.close()
409            await asyncio.sleep(
410                0.250
411            )  #  https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
412
413    async def search(self, query: str = None, pages: int = 1) -> "AsyncYoutubeSearch":
414        """
415        Parameters
416        ----------
417        query : str
418            Search query
419        pages : str
420            How many page you wanna scroll
421
422        Returns
423        -------
424        self
425            AsyncYoutubeSearch object
426        """
427        self._videos.clear()
428        if query:
429            self._api_key = None
430            self._data.clear()
431        if self._api_key is None and not query:
432            raise ValueError("Last search query not found!")
433        tasks = []
434        for i in range(pages):
435            if i == 0 and query:
436                await self.__search(query, True)  # Get the api key and data first
437                continue
438            tasks.append(self.__search(query))
439        await asyncio.gather(*tasks)
440        return self
def encode_url(url: str) -> str:
20def encode_url(url: str) -> str:
21    """
22    Encode url and replace space to '+'
23
24    Parameters
25    ----------
26    url: str
27        URL
28
29    Returns
30    -------
31    str
32    """
33    return requests.utils.quote(url).replace("%20", "+")

Encode url and replace space to '+'

Parameters
  • url (str): URL
Returns
  • str
class YoutubeSearch(BaseYoutubeSearch):
211class YoutubeSearch(BaseYoutubeSearch):
212    """
213    Entry point class for youtube searching
214    """
215
216    def __init__(
217        self,
218        max_results: Optional[int] = None,
219        options: Options = Options(),
220        session: Optional[requests.Session] = None,
221    ):
222        """
223        Parameters
224        ----------
225        max_results : Optional[int], default 20
226            The maximum result that will be returned. Set to None to remove the limit
227        options : Options
228            youtube_search options
229        session : Optional[requests.Session], default None
230            Requests session
231        """
232        super().__init__(max_results, options)
233        requests.models.complexjson = self.json
234        self._requests_kwargs = {"timeout": options.timeout, "proxies": options.proxy}
235        self.__session = requests.Session() if session is None else session
236        self.__is_custom_session = bool(session)
237
238    def __enter__(self) -> "YoutubeSearch":
239        return self
240
241    def __exit__(self, *args) -> None:
242        self.close()
243
244    def __search(self, query: str, first: bool = False):
245        """
246        Search wrapper
247
248        Parameters
249        ----------
250        query: str
251            Search query
252        first: bool, default False
253            Is the first time search the query
254        """
255        if first:
256            url = f"{BASE_URL}/results?search_query={encode_url(query)}"
257            resp = self.__session.get(
258                url, cookies=self._cookies, **self._requests_kwargs
259            )
260            resp.raise_for_status()
261            body = resp.text
262            self._get_video(body)
263            return
264        url = f"{BASE_URL}/youtubei/v1/search?{self._api_key}&prettyPrint=false"
265        resp = self.__session.post(
266            url,
267            cookies=self._cookies,
268            data=self.json.dumps(self._data),
269            **self._requests_kwargs,
270        )
271        resp.raise_for_status()
272        body = resp.json()
273        self._get_video(body)
274
275    @property
276    def is_custom_session(self) -> bool:
277        """
278        Returns
279        -------
280        bool
281            Return True if user using custom session
282        """
283        return self.__is_custom_session
284
285    def close(self) -> None:
286        """
287        Close the context manager
288        """
289        self._api_key = None
290        self._data.clear()
291        self._videos.clear()
292        if not self.is_custom_session:
293            self.__session.close()
294
295    def search(self, query: str = None, pages: int = 1) -> "YoutubeSearch":
296        """
297        Parameters
298        ----------
299        query : str
300            Search query
301        pages : str
302            How many page you wanna scroll
303
304        Returns
305        -------
306        self
307            YoutubeSearch object
308        """
309        self._videos.clear()
310        if query:
311            self._api_key = None
312            self._data.clear()
313        if self._api_key is None and not query:
314            raise ValueError("Last search query not found!")
315        for i in range(pages):
316            if i == 0 and query:
317                self.__search(query, True)
318                continue
319            self.__search(query)
320        return self

Entry point class for youtube searching

YoutubeSearch( max_results: Union[int, NoneType] = None, options: youtube_search.options.Options = Options(json_parser=<module 'json' from '/opt/hostedtoolcache/Python/3.8.17/x64/lib/python3.8/json/__init__.py'>, language=None, timeout=10, proxy=None, region=None), session: Union[requests.sessions.Session, NoneType] = None)
216    def __init__(
217        self,
218        max_results: Optional[int] = None,
219        options: Options = Options(),
220        session: Optional[requests.Session] = None,
221    ):
222        """
223        Parameters
224        ----------
225        max_results : Optional[int], default 20
226            The maximum result that will be returned. Set to None to remove the limit
227        options : Options
228            youtube_search options
229        session : Optional[requests.Session], default None
230            Requests session
231        """
232        super().__init__(max_results, options)
233        requests.models.complexjson = self.json
234        self._requests_kwargs = {"timeout": options.timeout, "proxies": options.proxy}
235        self.__session = requests.Session() if session is None else session
236        self.__is_custom_session = bool(session)
Parameters
  • max_results (Optional[int], default 20): The maximum result that will be returned. Set to None to remove the limit
  • options (Options): youtube_search options
  • session (Optional[requests.Session], default None): Requests session
is_custom_session: bool
Returns
  • bool: Return True if user using custom session
def close(self) -> None:
285    def close(self) -> None:
286        """
287        Close the context manager
288        """
289        self._api_key = None
290        self._data.clear()
291        self._videos.clear()
292        if not self.is_custom_session:
293            self.__session.close()

Close the context manager

def search( self, query: str = None, pages: int = 1) -> youtube_search.search.YoutubeSearch:
295    def search(self, query: str = None, pages: int = 1) -> "YoutubeSearch":
296        """
297        Parameters
298        ----------
299        query : str
300            Search query
301        pages : str
302            How many page you wanna scroll
303
304        Returns
305        -------
306        self
307            YoutubeSearch object
308        """
309        self._videos.clear()
310        if query:
311            self._api_key = None
312            self._data.clear()
313        if self._api_key is None and not query:
314            raise ValueError("Last search query not found!")
315        for i in range(pages):
316            if i == 0 and query:
317                self.__search(query, True)
318                continue
319            self.__search(query)
320        return self
Parameters
  • query (str): Search query
  • pages (str): How many page you wanna scroll
Returns
  • self: YoutubeSearch object
class AsyncYoutubeSearch(BaseYoutubeSearch):
323class AsyncYoutubeSearch(BaseYoutubeSearch):
324    """
325    Entry point class for youtube searching
326    """
327
328    def __init__(
329        self,
330        max_results: Optional[int] = None,
331        options: Options = Options(),
332        session: Optional[aiohttp.ClientSession] = None,
333    ):
334        """
335        Parameters
336        ----------
337        max_results : Optional[int], default 20
338            The maximum result that will be returned. Set to None to remove the limit
339        options : Options
340            youtube_search options
341        session : Optional[aiohttp.ClientSession], default None
342            aiohttp client session
343        """
344        super().__init__(max_results, options)
345        if "domain" in self._cookies:
346            self._cookies.pop("domain")
347        self._requests_kwargs = {"timeout": options.timeout}
348        if isinstance(options.proxy, dict):
349            self._requests_kwargs["proxy"] = options.proxy.get("https", "")
350        self.__session = aiohttp.ClientSession() if session is None else session
351        self.__is_custom_session = bool(session)
352
353    async def __aenter__(self) -> "AsyncYoutubeSearch":
354        return self
355
356    async def __aexit__(self, *args) -> None:
357        await self.close()
358
359    async def __search(self, query: str, first: bool = False):
360        """
361        Search wrapper
362
363        Parameters
364        ----------
365        query: str
366            Search query
367        first: bool, default False
368            Is the first time search the query
369        """
370        if first:
371            url = f"{BASE_URL}/results?search_query={encode_url(query)}"
372            async with self.__session.get(
373                url, cookies=self._cookies, **self._requests_kwargs
374            ) as resp:
375                resp.raise_for_status()
376                body = await resp.text()
377            self._get_video(body)
378            return
379        url = f"{BASE_URL}/youtubei/v1/search?{self._api_key}&prettyPrint=false"
380        async with self.__session.post(
381            url,
382            cookies=self._cookies,
383            data=self.json.dumps(self._data),
384            headers={"content-type": "application/json"},
385            **self._requests_kwargs,
386        ) as resp:
387            resp.raise_for_status()
388            body = await resp.json(loads=self.json.loads)
389        self._get_video(body)
390
391    @property
392    def is_custom_session(self) -> bool:
393        """
394        Returns
395        -------
396        bool
397            Return True if user using custom session
398        """
399        return self.__is_custom_session
400
401    async def close(self) -> None:
402        """
403        Close the context manager
404        """
405        self._api_key = None
406        self._data.clear()
407        self._videos.clear()
408        if not self.is_custom_session:
409            await self.__session.close()
410            await asyncio.sleep(
411                0.250
412            )  #  https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
413
414    async def search(self, query: str = None, pages: int = 1) -> "AsyncYoutubeSearch":
415        """
416        Parameters
417        ----------
418        query : str
419            Search query
420        pages : str
421            How many page you wanna scroll
422
423        Returns
424        -------
425        self
426            AsyncYoutubeSearch object
427        """
428        self._videos.clear()
429        if query:
430            self._api_key = None
431            self._data.clear()
432        if self._api_key is None and not query:
433            raise ValueError("Last search query not found!")
434        tasks = []
435        for i in range(pages):
436            if i == 0 and query:
437                await self.__search(query, True)  # Get the api key and data first
438                continue
439            tasks.append(self.__search(query))
440        await asyncio.gather(*tasks)
441        return self

Entry point class for youtube searching

AsyncYoutubeSearch( max_results: Union[int, NoneType] = None, options: youtube_search.options.Options = Options(json_parser=<module 'json' from '/opt/hostedtoolcache/Python/3.8.17/x64/lib/python3.8/json/__init__.py'>, language=None, timeout=10, proxy=None, region=None), session: Union[aiohttp.client.ClientSession, NoneType] = None)
328    def __init__(
329        self,
330        max_results: Optional[int] = None,
331        options: Options = Options(),
332        session: Optional[aiohttp.ClientSession] = None,
333    ):
334        """
335        Parameters
336        ----------
337        max_results : Optional[int], default 20
338            The maximum result that will be returned. Set to None to remove the limit
339        options : Options
340            youtube_search options
341        session : Optional[aiohttp.ClientSession], default None
342            aiohttp client session
343        """
344        super().__init__(max_results, options)
345        if "domain" in self._cookies:
346            self._cookies.pop("domain")
347        self._requests_kwargs = {"timeout": options.timeout}
348        if isinstance(options.proxy, dict):
349            self._requests_kwargs["proxy"] = options.proxy.get("https", "")
350        self.__session = aiohttp.ClientSession() if session is None else session
351        self.__is_custom_session = bool(session)
Parameters
  • max_results (Optional[int], default 20): The maximum result that will be returned. Set to None to remove the limit
  • options (Options): youtube_search options
  • session (Optional[aiohttp.ClientSession], default None): aiohttp client session
is_custom_session: bool
Returns
  • bool: Return True if user using custom session
async def close(self) -> None:
401    async def close(self) -> None:
402        """
403        Close the context manager
404        """
405        self._api_key = None
406        self._data.clear()
407        self._videos.clear()
408        if not self.is_custom_session:
409            await self.__session.close()
410            await asyncio.sleep(
411                0.250
412            )  #  https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown

Close the context manager

async def search( self, query: str = None, pages: int = 1) -> youtube_search.search.AsyncYoutubeSearch:
414    async def search(self, query: str = None, pages: int = 1) -> "AsyncYoutubeSearch":
415        """
416        Parameters
417        ----------
418        query : str
419            Search query
420        pages : str
421            How many page you wanna scroll
422
423        Returns
424        -------
425        self
426            AsyncYoutubeSearch object
427        """
428        self._videos.clear()
429        if query:
430            self._api_key = None
431            self._data.clear()
432        if self._api_key is None and not query:
433            raise ValueError("Last search query not found!")
434        tasks = []
435        for i in range(pages):
436            if i == 0 and query:
437                await self.__search(query, True)  # Get the api key and data first
438                continue
439            tasks.append(self.__search(query))
440        await asyncio.gather(*tasks)
441        return self
Parameters
  • query (str): Search query
  • pages (str): How many page you wanna scroll
Returns
  • self: AsyncYoutubeSearch object