EDINETデータを取得して財務データベースを作ろう(定期実行)

投資・ファイナンス

今回は Google CloudのCloud Functionsを使って、EDINETから取得した財務データを Google ドライブ 上に定期実行で保存していくコードを作成したいと思います。EDINETからDBを作る部分は以前紹介した記事を参照してください。

これができれば自分で毎回実行しなくても、自動でEDINETの財務諸表データを蓄積していくことができます。

Google Cloudのプロジェクトを作成済みである前提で進めていくので、まだ Google Cloudのプロジェクトを作成していない場合や、Google ドライブの API を使っていない場合には、以前の GoogleドライブAPIを使う方法の記事を確認してください。

Cloud Functionsの作成

まずは関数を作成していきます。
プロジェクトのトップページで、Cloud Functionsを検索します。
Cloud Functions
ファンクションを作成から、作成を行います。
ファンクションを作成

必要な API を有効化するように表示されるので有効にします。
API

まずは構成部分に関してです。関数名やリージョンは適宜設定してください。トリガータイプは今回は、Pub/Subにしています。トピック ID は任意の ID を入力し 登録しておいてください。
構成

ランタイムの環境設定にいくつか環境変数を入力します。
ランタイム環境変数
– EDINET_API:EDINETから取得する API です
– folder_id : ダウンロードしたデータを保存していくGoogleDriveフォルダーのfolder_idです。
– main_folder_id:folder_idの中にmainという別のフォルダを作成して、その下に詳細のデータを保存していていくので、folder_idのフォルダの中にもう一つフォルダを作成して、そのフォルダのidを取得します

フォルダ ID 周りのことがよくわからない場合には Google ドライブ API の使い方を紹介した 別記事をご参照ください。

続いてコード部分の設定になります。
ランタイムは Python を選択します。
エントリーポイントは変更しても良いですか今回はこのままにしています。
コード

コードのmain.pyに以下を記述します。

#Google Cloud
import base64
import functions_framework

import io
import csv
import json
import pandas as pd
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
import os
import json
import requests
import pandas as pd
import datetime
import zipfile
import time
import pickle
from collections import defaultdict
from google.oauth2 import service_account
from googleapiclient.discovery import build
from arelle import Cntlr, ViewFileFactTable, ModelDtsObject, XbrlConst
from arelle.XbrlConst import conceptNameLabelRole, standardLabel, terseLabel, documentationLabel
import shutil
import os
from dateutil.relativedelta import relativedelta
# Google Drive APIの認証情報を設定する
SCOPES = ['https://www.googleapis.com/auth/drive']
SERVICE_ACCOUNT_FILE = 'credentials.json'  # サービスアカウントキーのJSONファイルパス
main_folder_id = os.environ.get("main_folder_id ")
folder_id = os.environ.get("folder_id ")
api_key = os.environ.get("EDINET_API")

def authenticate_drive():
    creds = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_FILE, scopes=SCOPES)
    service = build('drive', 'v3', credentials=creds)
    return service

service = authenticate_drive()

def search_folder_in_drive(service, folder_id, folder_name):
    # 指定されたフォルダIDの中で指定された名前のフォルダを検索し、存在すればそのIDを返す
    response = service.files().list(q=f"'{folder_id}' in parents and name='{folder_name}' and mimeType='application/vnd.google-apps.folder'",
                                    spaces='drive',
                                    fields='files(id)').execute()
    files = response.get('files', [])
    if files:
        return files[0]['id']
    else:
        return create_folder_in_drive(service, folder_name, parent_folder_id=folder_id)

def create_folder_in_drive(service, folder_name, parent_folder_id=None):
    # 指定された名前の新しいフォルダを作成し、作成したフォルダのIDを返す
    file_metadata = {
        'name': folder_name,
        'mimeType': 'application/vnd.google-apps.folder'

    }
    if parent_folder_id:
        file_metadata['parents'] = [parent_folder_id]

    folder = service.files().create(body=file_metadata, fields='id').execute()
    return folder.get('id')


def download_file_from_drive(service, folder_id, file_name, local_file_path):
    # Google Driveから指定されたフォルダのCSVファイルをダウンロードする
    results = service.files().list(q=f"name='{file_name}' and '{folder_id}' in parents",
                                   spaces='drive',
                                   fields='files(id)').execute()
    file_id = results.get('files', [])[0].get('id')

    request = service.files().get_media(fileId=file_id)
    fh = io.FileIO(local_file_path, mode='wb')
    downloader = MediaIoBaseDownload(fh, request)
    done = False
    while done is False:
        status, done = downloader.next_chunk()

def search_file(service, folder_id, file_name):
    results = service.files().list(q=f"name='{file_name}' and '{folder_id}' in parents",
                                   spaces='drive',
                                   fields='files(id)').execute()
    files = results.get('files', [])
    return files
def update_data(service, folder_id, file_name, data_row=None):
    # Google Driveに指定されたフォルダのCSVファイルにデータを追記する


    # Google Driveに同名のファイルが存在するかチェック
    results = service.files().list(q=f"name='{file_name}' and '{folder_id}' in parents",
                                   spaces='drive',
                                   fields='files(id)').execute()
    files = results.get('files', [])
    if files:
        if data_row is not None:
            # 既存のファイルがある場合は、その内容を取得して追記する
            if os.path.exists(file_name)==False:

                download_file_from_drive(service, folder_id, file_name, file_name)
            data_row.to_csv(file_name,mode='a',header=False,index=False)
        media_body = MediaFileUpload(file_name,
                                     mimetype='text/csv',
                                     resumable=True)
        request = service.files().update(fileId=files[0]['id'],
                                         media_body=media_body)
        request.execute()


    else:
        # 新規作成
        if data_row is not None:
            data_row.to_csv(file_name,index=False)
        file_metadata = {
            'name': file_name,
            'parents': [folder_id]
        }
        media = MediaFileUpload(file_name,
                                mimetype='text/csv',
                                resumable=True)
        file = service.files().create(body=file_metadata,
                                      media_body=media,
                                      fields='id').execute()
        return file.get('id')


def make_day_list(start_date, end_date):
    # 開始日から終了日までの日数を計算
    period = (end_date - start_date).days


    # 開始日から終了日までの日付リストを生成して返す
    day_list = [start_date + datetime.timedelta(days=d) for d in range(period + 1)]
    return day_list

# EDINETから企業情報を取得する関数

def make_doc_id_list(day_list):
    # 企業情報を格納するリスト
    securities_report_data = []

    # 各日付についてEDINET APIからデータを取得
    for day in day_list:
        url = "https://disclosure.edinet-fsa.go.jp/api/v2/documents.json"
        params = {"date": day, "type": 2,"Subscription-Key":api_key}

        res = requests.get(url, params=params)
        json_data = res.json()

        # 取得したデータから特定の条件を満たすものを抽出し、リストに追加
        for result in json_data["results"]:
            ordinance_code = result["ordinanceCode"]
            form_code = result["formCode"]

            if ordinance_code == "010" and form_code == "030000":
                securities_report_data.append(result)
        time.sleep(1)
    # 抽出した企業情報のリストを返す
    return securities_report_data
def download_zip_file(doc_id):
    url = f"https://disclosure.edinet-fsa.go.jp/api/v2/documents/{doc_id}"
    params = {"type": 1,"Subscription-Key":api_key}
    filename = f"{doc_id}.zip"
    # 解凍先のディレクトリのパス
    extract_dir = filename.replace('.zip','')

    res = requests.get(url, params=params, stream=True)
    if res.status_code == 200:
        with open(filename, 'wb') as file:
            for chunk in res.iter_content(chunk_size=1024):
                file.write(chunk)
        # print(f"ZIP ファイル {filename} のダウンロードが完了しました。")
        # ZipFileオブジェクトを作成
        zip_obj = zipfile.ZipFile(filename, 'r')

        # 解凍先ディレクトリを作成(存在しない場合)

        if not os.path.exists(extract_dir):
            os.makedirs(extract_dir)
        # 全てのファイルを解凍
        zip_obj.extractall(extract_dir)
        # ZipFileオブジェクトをクローズ
        zip_obj.close()
        # print(f"ZIP ファイル {filename} を解凍しました。")
    else:
        print("ZIP ファイルのダウンロードに失敗しました。")

def del_files(doc_id):
        # フォルダの削除
    folder_name = doc_id
    shutil.rmtree(folder_name)

    # ファイルの削除
    file_name = f"{doc_id}.zip"
    os.remove(file_name)



    # フォルダが存在するか確認
    if os.path.exists(doc_id):
        # フォルダごと削除
        shutil.rmtree(doc_id)


from arelle.XbrlConst import conceptNameLabelRole, standardLabel, terseLabel, documentationLabel
class MyViewFacts(ViewFileFactTable.ViewFacts):
    def __init__(self, modelXbrl, outfile, arcrole, linkrole, linkqname, arcqname, ignoreDims, showDimDefaults, labelrole, lang, cols,col_num=1,label_cell=None):
        super().__init__(modelXbrl, outfile, arcrole, linkrole, linkqname, arcqname, ignoreDims, showDimDefaults, labelrole, lang, cols)
        self.data = []

    def viewConcept(self, concept, modelObject, labelPrefix, preferredLabel, n, relationshipSet, visited):
        # bad relationship could identify non-concept or be None
        if (not isinstance(concept, ModelDtsObject.ModelConcept) or
            concept.substitutionGroupQname == XbrlConst.qnXbrldtDimensionItem):
            return
        cols = ['' for i in range(self.numCols)]
        i = 0
        for col in self.cols:
            if col == "Facts":
                self.setRowFacts(cols,concept,preferredLabel)
                i = self.numCols - (len(self.cols) - i - 1) # skip to next concept property column
            else:
                if col in ("Concept", "Label"):
                    cols[i] = labelPrefix + concept.label(preferredLabel,lang=self.lang,linkroleHint=relationshipSet.linkrole)


                i += 1

        attr = {"concept": str(concept.qname)}
        self.addRow(cols, treeIndent=n,
                    xmlRowElementName="facts", xmlRowEltAttr=attr, xmlCol0skipElt=True)
        self.add_content(concept, modelObject)
        if concept not in visited:
            visited.add(concept)
            for i, modelRel in enumerate(relationshipSet.fromModelObject(concept)):
                nestedRelationshipSet = relationshipSet
                targetRole = modelRel.targetRole
                if self.arcrole in XbrlConst.summationItems:
                    childPrefix = "({:0g}) ".format(modelRel.weight) # format without .0 on integer weights
                elif targetRole is None or len(targetRole) == 0:
                    targetRole = relationshipSet.linkrole
                    childPrefix = ""
                else:
                    nestedRelationshipSet = self.modelXbrl.relationshipSet(self.arcrole, targetRole, self.linkqname, self.arcqname)
                    childPrefix = "(via targetRole) "
                toConcept = modelRel.toModelObject
                if toConcept in visited:
                    childPrefix += "(loop)"
                labelrole = modelRel.preferredLabel
                if not labelrole or self.labelrole == conceptNameLabelRole:
                    labelrole = self.labelrole
                self.viewConcept(toConcept, modelRel, childPrefix, labelrole, n + 1, nestedRelationshipSet, visited)
            visited.remove(concept)
    def add_content(self, concept, modelObject):
        if concept.isNumeric:
            label = concept.label(lang='ja')
            s_label = concept.label(preferredLabel=standardLabel, lang='ja')
            facts = self.conceptFacts[concept.qname]

            if isinstance(modelObject, ModelDtsObject.ModelRelationship):
                parent_name = modelObject.fromModelObject.qname
                parent_label = modelObject.fromModelObject.label(preferredLabel=standardLabel, lang='ja')
            else:
                parent_name = None
                parent_label = None
            if isinstance(modelObject, str):
                link_def = self.linkRoleDefintions[modelObject]
            elif isinstance(modelObject, ModelDtsObject.ModelRelationship):
                link_def = self.linkRoleDefintions[modelObject.linkrole]
            for f in facts:
                if f.unit is not None:
                    unit = f.unit.value
                else:
                    unit = None
                value = f.xValue
                context = f.context
                self.data.append([concept.qname,concept.typeQname,concept.name, label, s_label, parent_name, parent_label,value, context.startDatetime, context.endDatetime, unit, link_def,context.id])



def viewFacts(modelXbrl, outfile, arcrole=None, linkrole=None, linkqname=None, arcqname=None, ignoreDims=False, showDimDefaults=False, labelrole=None, lang=None, cols=None,col_num=1, label_cell=None):

    if not arcrole: arcrole=XbrlConst.parentChild
    view = MyViewFacts(modelXbrl, outfile, arcrole, linkrole, linkqname, arcqname, ignoreDims, showDimDefaults, labelrole, lang, cols,col_num, label_cell)

    view.view(modelXbrl.modelDocument)
    df = pd.DataFrame(view.data, columns=['Name','Type','LocalName','Label','StandardLabel','ParentName','ParentLabel', 'Value','StartDate','EndDate','Unit','LinkDefinition','ContextID'])
    view.close()
    return pd.DataFrame(df)

def parse_df(df):
    if len(df) > 0:
        df = df[df['Value'].isnull()==False]
        df = df.drop_duplicates(['Name','ContextID','Value'])
        return df[['Name','ParentName','StandardLabel','Value','StartDate','EndDate','Unit','ContextID','ParentLabel']]

def make_dict(df, concept_dict=None):
    if concept_dict is None:
        # defaultdictの初期化
        concept_dict = defaultdict(lambda: {'StandardLabel': None, 'ParentName': None, 'ParentLabel':None})

        # dfのデータをdefaultdictに登録
        for _, row in df.iterrows():
            name = str(row['Name'])
            concept_dict[name]['StandardLabel'] = row['StandardLabel']
            concept_dict[name]['ParentName'] = str(row['ParentName'])
            concept_dict[name]['ParentLabel'] = row['ParentLabel']
    else:
        # dfのデータをチェックして追加
        for idx, row in df.iterrows():
            name = str(row['Name'])
            std_label = row['StandardLabel']
            parent_name = str(row['ParentName'])

            name_to_check = name
            name_conflict = False

            if name_to_check in concept_dict:
                if (concept_dict[name_to_check]['StandardLabel'] == std_label and
                    concept_dict[name_to_check]['ParentName'] == parent_name):
                    # 名前が既に存在し、StandardLabelとParentNameが同じ場合
                    name_conflict = False
                else:
                    # 名前が既に存在し、StandardLabelまたはParentNameが異なる場合
                    name_conflict = True
                    name_to_check = name + '_new'
                    count = 1
                    while name_to_check in concept_dict:
                        if (concept_dict[name_to_check]['StandardLabel'] == std_label and
                            concept_dict[name_to_check]['ParentName'] == parent_name):
                            # 名前が既に存在し、StandardLabelとParentNameが同じ場合
                            name_conflict = False
                            break
                        else:
                            name_to_check = f"{name}_new{count}"
                            count += 1

            if name_conflict:
                # 新しい名前を登録
                concept_dict[name_to_check]['StandardLabel'] = std_label
                concept_dict[name_to_check]['ParentName'] = parent_name
                concept_dict[name_to_check]['ParentLabel'] = row['ParentLabel']
                df.loc[idx, 'Name'] = name_to_check
            else:
                # 名前が既に存在し、StandardLabelとParentNameが同じ場合
                concept_dict[name]['StandardLabel'] = std_label
                concept_dict[name]['ParentName'] = parent_name
                concept_dict[name]['ParentLabel'] = row['ParentLabel']

    return concept_dict, df


meta_cols = ['date','seqNumber', 'docID', 'edinetCode', 'secCode', 'JCN', 'filerName', 'fundCode', 'ordinanceCode', 'formCode', 'docTypeCode', 'periodStart', 'periodEnd', 'submitDateTime']
def make_metafile(meta_csv_file):

    if search_file(service, folder_id, meta_csv_file):
        if os.path.exists(meta_csv_file)==False:
            download_file_from_drive(service, folder_id, meta_csv_file, meta_csv_file)
        df_meta = pd.read_csv(meta_csv_file)
        if len(df_meta) > 0:
            seq_num = df_meta.iloc[-1,:]['seqNumber']
            date_prev = df_meta.iloc[-1,:]['date']
            date_prev = datetime.datetime.strptime(date_prev, '%Y-%m-%d')

        else:
            date_prev = None
            seq_num = 0

    else:
        df_header = pd.DataFrame(columns=meta_cols)

        df_header.to_csv(meta_csv_file, index=False)
        update_data(service, folder_id, meta_csv_file)
        date_prev = None
        seq_num = 0

    return date_prev, seq_num

concept_file_path = 'concept_dict.pkl'
def load_concept():
    if os.path.exists(concept_file_path)==False:
        if search_file(service, folder_id, concept_file_path):
            download_file_from_drive(service, folder_id, concept_file_path, concept_file_path)
        else:
            concept_dict = defaultdict(lambda: {'StandardLabel': None, 'ParentName': None, 'ParentLabel':None})
            return concept_dict

    # ファイルから読み込む
    with open(concept_file_path, 'rb') as file:
        loaded_normal_dict = pickle.load(file)

    # 普通の辞書をdefaultdictに変換
    concept_dict = defaultdict(lambda: {'StandardLabel': None, 'ParentName': None, 'ParentLabel':None}, loaded_normal_dict)
    return concept_dict
def save_concept(concept_dict):
    # # defaultdictを普通の辞書に変換
    normal_dict = dict(concept_dict)


    with open(concept_file_path, 'wb') as file:
        pickle.dump(normal_dict, file)
    update_data(service, folder_id, concept_file_path)
header = ['doc_id', 'Name', 'Value', 'StartDate', 'EndDate', 'Unit', 'ContextID']

def main():


    concept_dict = load_concept()

    # 日付範囲
    meta_csv_file = '2024.csv'
    date_prev, seq_num = make_metafile(meta_csv_file)
    if date_prev is None:
        start_date = datetime.date(2024,7,1)
    else:
        start_date = date_prev.date()

    end_date =  datetime.datetime.today().date()

    # 日付範囲から日付リストを作成する関数を呼び出し
    day_list = make_day_list(start_date, end_date)
    for date in day_list:

        folder_year = date.year
        meta_csv_file = f'{folder_year}.csv'
        date_prev, seq_num = make_metafile(meta_csv_file)
        if date_prev is not None:
            if date_prev.date()>date:
                continue
        year_folder_id = search_folder_in_drive(service, main_folder_id, folder_year)



        # 日付リストを利用してEDINETから企業情報を取得する関数を呼び出し
        securities_report_data = make_doc_id_list(2024/09/20)

        # 取得した企業情報をデータフレームに格納
        df = pd.DataFrame(securities_report_data)
        print(date,len(df))
        if len(df) == 0:
            continue
        df = df.sort_values('seqNumber')

        df_ex = df[(~(df['secCode'].isnull())) & (df['legalStatus']!='0') & (df['xbrlFlag']=='1')]
        name = date.strftime('%Y%m%d')
        print(date,len(df_ex))



        if len(df_ex) > 0:
            csv_file = name + '.csv'



            counter = 0
            for _, row in df_ex.iterrows():

                if date_prev is not None:
                    if date_prev.date()==date and row['seqNumber'] <= seq_num:
                        continue
                doc_id = row['docID']
                download_zip_file(doc_id)

                file_list = os.listdir(f'{doc_id}/XBRL/PublicDoc')
                xbrl_files = [file for file in file_list if file.endswith('.xbrl')]
                xbrl_file = os.path.join(f'{doc_id}/XBRL/PublicDoc', xbrl_files[0])


                cols = ['Concept','Facts']
                ctrl = Cntlr.Cntlr(logFileName='logToPrint')
                modelXbrl = ctrl.modelManager.load(xbrl_file)
                data = viewFacts(modelXbrl, 'test.csv', cols=cols, lang='ja')
                data = parse_df(data)
                concept_dict,data = make_dict(data, concept_dict)
                data['doc_id'] = doc_id
                update_data(service, year_folder_id, csv_file, data[['doc_id','Name','Value','StartDate','EndDate','Unit','ContextID']])
                row['date'] = date.strftime('%Y-%m-%d')
                add_data = pd.DataFrame(row).T.loc[:,meta_cols]

                update_data(service, folder_id, meta_csv_file, add_data)
                save_concept(concept_dict)
                del_files(doc_id)
                counter += 1
            print(len(df_ex),counter)


# Triggered from a message on a Cloud Pub/Sub topic.
@functions_framework.cloud_event
def hello_pubsub(cloud_event):
    main()


requirements.txtは次のようにします。

functions-framework==3.*
arelle-release
google-auth
google-api-python-client
pandas
requests
datetime
google-cloud-storage
google-cloud-bigquery
zipfile36

また、認証情報を記入したcredentials.jsonというファイルを新しく作成して、中身に認証情報を記入しておきます。

これで準備は完了です。デプロイをして、テスト実行してみて、うまくログが出力されていれば成功です。
取得の開始日などは必要に応じて変更してください。

環境変数がうまく認識されない場合があります。その場合はあまり推奨されませんが行動に直接環境変数の値を入力することで、解決できます。

デプロイを行って、テストを行い、うまく登録されることを確認できたら、定期実行のためのスケジュールを設定していきます。
ログ

Cloud Scheduler

定期実行のためには,
Cloud Scheduler APIを使用することができます.
まずはコンソールで検索をし API を有効にします
検索

有効か
ジョブを作成から新しいジョブを作成します。

新しいジョブ
名前やリージョンは自由に設定をします。

今回は毎朝5時に実行するための頻度を入力しました。
「0 5 * * *」で5時に実行することができます

トリガー設定

ターゲットタイプは、Pub/Subにして作成したトピックを選択し保存します。

これで定期実行で自動的にデータ収集が可能になります。
時々メモリーサイズが足りないようなのでその場合にはメモリーサイズを大きくするなどして対応してください。

タイトルとURLをコピーしました