ホーム>source

800Kペアのドキュメントで4つの類似性(cosine_similarity、jaccard、Sequence Matcher類似性、jaccard_variants類似性)を測定しようとしています。

すべての文書ファイルはtxt形式で、約100KB〜300KB(約1500000文字)です。

Pythonスクリプトを高速化する方法に関して2つの質問があります。

マイパイソンスクリプト:

from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from difflib import SequenceMatcher
def get_tf_vectors(doc1, doc2):
    text = [doc1, doc2]
    vectorizer = CountVectorizer(text)
    vectorizer.fit(text)
    return vectorizer.transform(text).toarray()
def measure_sim(doc1, doc2):
    a, b = doc1.split(), doc2.split()
    c, d = set(a), set(b)
    vectors = [t for t in get_tf_vectors(doc1, doc2)]
    return cosine_similarity(vectors)[1][0], float(len(c&d) / len(c|d)), \
            1 - (sum(abs(vectors[0] - vectors[1])) / sum(vectors[0] + vectors[1])), \
            SequenceMatcher(None, a, b).ratio()
#items in doc_pair list are like('ID', 'doc1_directory', 'doc2_directory')
def data_analysis(doc_pair_list):
    result = {}
    for item in doc_pair_list:
        f1 = open(item[1], 'rb')
        doc1 = f1.read()
        f1.close()
        f2 = oepn(item[2], 'rb')
        doc2 = f2.read()
        f2.close()
        result[item[0]] = measure_sim(doc1, doc2)

ただし、このコードはCPUの10%しか使用せず、このタスクを完了するにはほぼ20日かかります。したがって、このコードをより効率的にする方法があるかどうかを尋ねたいと思います。

Q1。ドキュメントはHDDに保存されるため、これらのテキストデータの読み込みには時間がかかると思いました。したがって、コンピューターが類似性を計算するたびに2つの文書のみをロードするのは効率的ではないと考えられます。したがって、一度に50組のドキュメントをロードして、それぞれ類似度を計算してみます。役に立つでしょうか?

Q2。 「コードを高速に実行する方法」に関する投稿のほとんどは、Cコードに基づいたPythonモジュールを使用する必要があると述べています。しかし、私は非常に効率的であることが知られているsklearnモジュールを使用しているので、もっと良い方法があるのだろうかと思います。

このPythonスクリプトがより多くのコンピュータリソースを使用して高速化するのに役立つ方法はありますか?

あなたの答え
  • 解決した方法 # 1

    より良い解決策があるかもしれませんが、類似性のカウントがブロッカーである場合は、次のようなものを試すことができます。 1)すべてのファイルを1つずつ読み取り、multiprocessing.Queueに入れる別のプロセス 2)類似性を数え、結果をmultiprocessing.Queueに入れるための複数のワーカープロセスのプール。 3)次に、メインスレッドは単にresults_queueから結果を読み込み、現在のように辞書に保存します。

    ハードウェアの制限(CPUコアの数と速度、RAMサイズ、ディスクの読み取り速度)がわかりません。テストするサンプルもありません。 編集:以下に説明するコードを提供します。速くなっているかどうかを確認して、教えてください。メインブロッカーがファイルの読み込みである場合、より多くのローダープロセスを作成できます(たとえば、2つのプロセスとそれぞれがファイルの半分を読み込みます)。ブロッカーが類似度を計算している場合は、さらにワーカープロセスを作成できます(worker_countを変更するだけです)。最後に、「結果」はすべての結果を含む辞書です。

       import multiprocessing
        import os
        from difflib import SequenceMatcher
        from sklearn.feature_extraction.text import CountVectorizer
        from sklearn.metrics.pairwise import cosine_similarity
    
        def get_tf_vectors(doc1, doc2):
            text = [doc1, doc2]
            vectorizer = CountVectorizer(text)
            vectorizer.fit(text)
            return vectorizer.transform(text).toarray()
    
        def calculate_similarities(doc_pairs_queue, results_queue):
            """ Pick docs from doc_pairs_queue and calculate their similarities, save the result to results_queue. Repeat infinitely (until process is terminated). """
            while True:
                pair = doc_pairs_queue.get()
                pair_id = pair[0]
                doc1 = pair[1]
                doc2 = pair[2]
                a, b = doc1.split(), doc2.split()
                c, d = set(a), set(b)
                vectors = [t for t in get_tf_vectors(doc1, doc2)]
                results_queue.put((pair_id, cosine_similarity(vectors)[1][0], float(len(c&d) / len(c|d)),
                    1 - (sum(abs(vectors[0] - vectors[1])) / sum(vectors[0] + vectors[1])),
                    SequenceMatcher(None, a, b).ratio()))
    
        def load_files(doc_pair_list, loaded_queue):
            """
            Pre-load files and put them to a queue, so working processes can get them.
            :param doc_pair_list: list of files to be loaded (ID, doc1_path, doc2_path)
            :param loaded_queue: multiprocessing.Queue that will hold pre-loaded data
            """
            print("Started loading files...")
            for item in doc_pair_list:
                with open(item[1], 'rb') as f1:
                    with open(item[2], 'rb') as f2:
                        loaded_queue.put((item[0], f1.read(), f2.read()))  # if queue is full, this automatically waits until there is space
            print("Finished loading files.")
    
        def data_analysis(doc_pair_list):
            # create a loader process that will pre-load files (it does no calculations, so it loads much faster)
            # loader puts loaded files to a queue; 1 pair ~ 500 KB, 1000 pairs ~ 500 MB max size of queue (RAM memory)
            loaded_pairs_queue = multiprocessing.Queue(maxsize=1000)
            loader = multiprocessing.Process(target=load_files, args=(doc_pair_list, loaded_pairs_queue))
            loader.start()
            # create worker processes - these will do all calculations
            results_queue = multiprocessing.Queue(maxsize=1000)  # workers put results to this queue
            worker_count = os.cpu_count() if os.cpu_count() else 2  # number of worker processes
            workers = []  # create list of workers, so we can terminate them later
            for i in range(worker_count):
                worker = multiprocessing.Process(target=calculate_similarities, args=(loaded_pairs_queue, results_queue))
                worker.start()
                workers.append(worker)
            # main process just picks the results from queue and saves them to the dictionary
            results = {}
            i = 0  # results counter
            pairs_count = len(doc_pair_list)
            while i < pairs_count:
                res = results_queue.get(timeout=600)  # timeout is just in case something unexpected happened (results are calculated much quicker)
                # Queue.get() is blocking - if queue is empty, get() waits until something is put into queue and then get it
                results[res[0]] = res[1:]  # save to dictionary by ID (first item in the result)
            # clean up the processes (so there aren't any zombies left)
            loader.terminate()
            loader.join()
            for worker in workers:
                worker.terminate()
                worker.join()
    
    

    結果について教えてください、私はそれに非常に興味があり、必要に応じてさらに支援します;)

  • 解決した方法 # 2

    最初に行うべきことは、実際のボトルネックを見つけることができるかどうかを確認することです。cProfileを使用すると、疑念が確認されたり、問題を明らかにしたりできると思います。

    次のようにcProfileを使用して、コードを変更せずに実行できるはずです。

    python -m cProfile -o profiling-results python-file-to-test.py
    
    

    その後、次のようなpstatsを使用して結果を分析できます。

    import pstats
    stats = pstats.Stats("profiling-results")
    stats.sort_stats("tottime")
    stats.print_stats(10)
    
    

    コードのプロファイリングについて詳しくは、Marco Bonazaninのブログ記事「My Python Code is Slow?プロファイリングのヒント

  • 前へ java - JPAクエリ:サブクエリをグループ化条件に結合する
  • 次へ Scala Sparkで2つのデータフレームを結合する