Hello guys ! Welcome to the second part of this tutorial series on Developing Interactive FTP Shell Using Ftplib and Multi-threading in Python. In the first part we have dealt with the ftp services and now in this part we will see how we can use the power of multithreading in python.We will also see how using Queues with Multithreading simplifies our job.

Before moving further let us have some foundation on Queues.

Queues

The Queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming where information must be exchanged safely between multiple threads.

Commonly used functions :

Queue.empty()

Return True if the queue is empty, False otherwise. If empty() returns True it doesn’t guarantee that a subsequent call to put() will not block. Similarly, if empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.

Queue.put(item[block[timeout]])

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot is available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.get([block[timeout]])

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Queue.join()

Blocks until all items in the queue have been gotten and processed.The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

Now lets get started…

  1. Lets create a new file in our root directory and name it as app.py.
  2. Now lets import the libraries that we will be using in this program.
    from ftp_service import FtpServices
    import ftplib
    import threading
    import datetime
    from queue import Queue
    import os
  3. Now create a class called ThreadingFtp as below :
    class ThreadingFtp(object):
    
        def __init__(self):
            self.ftpService = None
    
        def login(self):
            self.ftpService = FtpServices(host="your_host", uname="your_username", upass="your_pass")
    
        def upload_files(self, q):
            while not q.empty():
                filename = q.get()[1]
                self.login()
                st = datetime.datetime.now()
                print("File upload started for "+filename+" at "+str(st))
                try:
                    self.ftpService.uploadFile(filename=filename)
                    et = datetime.datetime.now()
                    total_time = str((et-st).total_seconds())
                    print(filename+" uploaded successfully in "+total_time+" seconds")
                    q.task_done()
    
                except ftplib.all_errors:
                    print(ftplib.all_errors)
                    print("Uploading failed for "+filename)
                    q.task_done()
    
        def download_files(self, q):
            while not q.empty():
                filename = q.get()[1]
                self.login()
                st = datetime.datetime.now()
                print("File download started for "+filename+" at "+str(st))
                try:
                    self.ftpService.downloadFile(filename=filename)
                    et = datetime.datetime.now()
                    total_time = str((et-st).total_seconds())
                    print(filename+" downloaded successfully in "+total_time+" seconds")
                    q.task_done()
                except ftplib.all_errors:
                    print(ftplib.all_errors)
                    print("Downloading failed for " + filename)
                    q.task_done()
  4. We are done with the creation of the class and now lets proceed with implementation part. Write the below code on zero indentation in your app.py file.
    def main_app():
        while 1:
            mftp = ThreadingFtp()
            print(50*"#"+" Welcome to MFTP Console "+50*"#")
            print("Please select your task from below :")
            print("1. Upload Files")
            print("2. Download Files")
            print("3. Exit")
            user_input = input("Enter your option  : ")
    
            if user_input == "1":
                all_files = os.listdir('uploads')
                workers = []
                q = Queue(maxsize=0)
                num_threads = min(20, len(all_files))
                for i in range(len(all_files)):
                    q.put((i, all_files[i]))
    
                for i in range(num_threads):
                    worker = threading.Thread(target=mftp.upload_files, args=(q,))
                    workers.append(worker)
                    worker.start()
                q.join()
                for t in workers:
                    t.join()
    
            elif user_input == "2":
                all_files = ["file1.txt", "file2.txt", "file3.txt"]
                workers = []
                q = Queue(maxsize=0)
                num_threads = min(20, len(all_files))
                for i in range(len(all_files)):
                    q.put((i, all_files[i]))
    
                for i in range(num_threads):
                    worker = threading.Thread(target=mftp.download_files, args=(q,))
                    workers.append(worker)
                    worker.start()
                q.join()
                for t in workers:
                    t.join()
            elif user_input == "3":
                break
            else:
                print("You selected wrong option !")

    In this function we add the shell interaction by adding user input methods. Also, we instantiate our  `ThreadingFtp`  class and create threads and add tasks to the queue. The threads then pick the tasks from the queue unless all the tasks are accomplished.

  5. Now at last the key to the lock is still remaining. Add the below line with zero indentation in `app.py` file.
    if __name__ == "__main__":
        main_app()

That’s it ! Congratulations we have reached the end of this tutorial series.If you have any doubt related  to this tutorial you are free to leave your comment below and I will be happy to help you. As for now lets wrap it here, see you in the next tutorial.


Related Posts

2 thoughts on “Developing Interactive FTP Shell Using Ftplib and Multi-threading in Python (II)”

    1. Python has a very vast area of implication. Due to its compactness and modular formation it can be handy in most of the known fields like Datascience,Web Development,Game Development,Data mining,Machine learning,AI etc.

Leave a Reply

Your email address will not be published. Required fields are marked *