YouTube is a place full of content and media. That makes it a great place to collect, and scrape data. In this tutorial, I’ll be explaining how to scrape videos on YouTube with extra methods and features to make this task easy and fun.
Python is a great place to start. Its syntax is beginner friendly and it’s accessibility to libraries makes it easy to scrape with ease. We’ll be using a handful of libraries to help us out. In our case, some major imports we’ll use are: Selenium, multiprocessing, requests, and queue.
Lets start with setting up our imports…
# ----- Imports and Library's ----- #
from threading import Thread
# ----- Selenium Library's ----- #
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
Now that we setup all our imports we can start with creating our classes and functions to do the job. We’ll start with creating the class “Scrape”.
So first we initialize our class and create our instance variables. We have our search query which is the videos we search for, we have link count which is the minimum amount of links we want (unless times out), we have our view and sub count variables which are set to get the maximum amount of subs or views allowed per video, and finally we have
self.url which creates the URL to open and adds
'&sp=CAO%253D' which gives us the most recently uploaded videos.
self.filtered_links = queue.Queue()
self.links = set()
self.timeout = False
self.links_gathered = False
self.position = 0
self.manager = multiprocessing.Manager()
self.return_dict = self.manager.dict()
self.channels =  # Used to store viewed channels and their subs
Now add this snippet of code right after the code inside the
__init__ . Here we declare some variables such as:
self.filtered_links = queue.Queue() which creates us our queue that has no duplicate urls,
self.links = set() ensures we don’t save any duplicates,
self.timeout is for a timeout function we create incase gathering links takes too long,
self.return_dict is used to store our emails. And finally we have
self.channels which is used to keep track of channels seen and their subscriber count (We do so to make the code faster and more efficient).
# Driver Setup and Options chromedriver_autoinstaller.install(cwd=True) # Get latest version of chromedriver // Cwd=True -> Saves driver in directory
self.options = webdriver.ChromeOptions() self.options.add_argument("--incognito") self.options.add_argument("--log-level=OFF") self.options.add_argument('--disable-gpu') self.options.add_argument("--headless") # Sets headless mode for no window popup
self.__FetchVideos() # Fetch URLs
Now we setup Selenium’s drivers and options and call our
FetchVideosfunction. We use the library chromedriver_autoinstaller so that we always get the correct updated version of chromedriver automatically. We set some arguments like
--incognito so it runs chrome in incognito mode. We also use
--headless so chrome doesn’t pop up and runs seamlessly in the background.
Now lets create our
FetchVideos function, which gathers the URLs matching our parameters. We start by creating the function
__FetchVideos(self): and declare some variables. Then we start up and open chrome driver. We use
with which is a context manager. It ensures the code can open and close safely. We then call
driver.get(self.url) which opens chrome and gets the URL to the videos page. We then create a thread for our timer function and start it.
Add this right under the code before. Here we render the URLs to gather. We use a try and except to catch any errors (typically YouTube popping up captcha). If we catch an error, we will close driver just in case and exit the code. Starting off we have a while loop. It loops until the number of links is gathered or timeout is set to
True. Then we run a for loop using
WebDriverWait until all elements are visible. We use XPATH selector to get the URLs of the videos. Then if we want sub count or view count parameters to be checked, we go into the if statement find the views element. In subscribers case, we have to get the channel and use our function that we make
__subFinder(channel) to get the sub count. If the channel is already stored in our list, return the sub count. Then we run it through our if statement to check it’s the right requirements then if so, add URL to
self.links . Finally we add
self.links_gathered = True.
Now lets create our
__subFinderfunction. Lets start with our
Here we declare our count is 60 seconds and say if sub count is enabled, add another 60 seconds because it takes longer to fetch sub count. We set it for 50 links a minute and if sub count, 50 links per 2 minutes. So if we have 100 links to fetch and no sub count required, it’ll take 1 minute max before returning the links gathered so far.
Now earlier you saw this function being called,
cstn. That’s another function we created to convert string numbers like “5k” or “50,000" to be an integer. Here what the function looks like.
Now lets get onto the email scraping part. We’ll be using multi-processing to make it faster and more efficient, taking advantage of our computers processing power.
Here we create our worker function. This function gets the URLs page from the queue, uses regex to look for an email and calls our function
fix_email to fix any broken emails like
techysavage@gmail... would be
email@example.com. Then we save it to our manager dictionary and release the lock. Below is our fix email function which tries to just fix any minor mistakes.
# ----- Fix Email Function ----- #
def fix_email(email): endings = ['.com', '.us', '.ca', '.ge', '.uk', '.it'] # For different types of domain endings # Loop through endings and fix email for e in endings:
if e in email and not email.endswith(e):
return email.split(e) + e
Finally we have our run function which handles it all. First we check if inputted CPU count is a digit, if not then we check if the parameter says
full. Half means it uses half of the logical processors or uses CPU usage at around 50% whereas full uses them all. We create our semaphore lock so that it can only uses a specific amount of logical processors. Then we create our scrape object and run it. Once it’s finished we call our
start_processes function and run the workers. When it’s all finished, we get our emails saved without any duplicates in a set, and create a returnable object which has the total time and emails stored in a dictionary.
Thank you for reading through and I hope you’ve learned something new and this code helps. You can check the code out on my GitHub or copy the full code below. 👍 https://github.com/Ashura-R/Youtube-Email-Scraper/