これまで何度かArelleの使い方やxbrl の構造について紹介してきました。今回はこれらの知識を用いながら、EDINET上の有価証券報告書から必要な情報を取り出し、決算情報のデータベースを作る方法を紹介します。
EDINETの公開期間は標準の場合5年、延長の場合10年となりますが、自分でデータベースを作っておけば、公開期間が終了した後のデータも参照可能です。
長期のバックテストを行いたい場合などには、必須となるデータになりますが、多くのデータ提供サービスでは、無料の範囲だと3年から5年までの財務データしか取得できません。自分で財務データを収集しておけば、無料でこれらのデータを活用できます。
全体構成
まずはじめに全体の大まかな構成を説明しておきます。
基本的な流れは以下のようになります。
1. 指定した範囲の日時から、その期間に公開された有価証券報告書の一覧を取得する
2. 現在も閲覧可能な状態であり、上場している企業の有価証券報告書(銘柄コードの存在する)を抽出する
3. 抽出した対象の有価証券報告書をダウンロードする
4. ダウンロードした有価証券報告書の xbrl ファイルを解析して、数値データを取得する。
5. 数値に対応するqnameやcontextID, label,親子関係などを整理してCSV に保存する
本格的にデータベースとして扱う場合には、CSV に保存するよりも、MySQLなどのデータベースに登録する方が良いと考えられます。
今回は、簡単に実装できることを重視して、CSV 保存 としました。
全体のコードは一番最後にGoogleColabへのリンクを載せていますので、そちらを参照してください。
それではそれぞれの詳細を見ていきます。
有価証券報告書の一覧取得
まずは指定した期間の有価証券報告書一覧を取得します。これは以前の記事でも扱っている内容になります。
# 指定した日付範囲内の日付リストを生成する関数
def make_day_list(start_date, end_date):
# 開始日から終了日までの日数を計算
period = (end_date - start_date).days
# 開始日から終了日までの日付リストを生成して返す
day_list = [start_date + datetime.timedelta(days=d) for d in range(period + 1)]
return day_list
# EDINETから企業情報を取得する関数
def make_doc_id_list(day_list):
# 企業情報を格納するリスト
securities_report_data = []
# 各日付についてEDINET APIからデータを取得
for day in day_list:
url = "https://disclosure.edinet-fsa.go.jp/api/v2/documents.json"
params = {"date": day, "type": 2,"Subscription-Key":api_key}
res = requests.get(url, params=params)
json_data = res.json()
# 取得したデータから特定の条件を満たすものを抽出し、リストに追加
for result in json_data["results"]:
ordinance_code = result["ordinanceCode"]
form_code = result["formCode"]
if ordinance_code == "010" and form_code == "030000":
securities_report_data.append(result)
time.sleep(1)
# 抽出した企業情報のリストを返す
return securities_report_data
from dateutil.relativedelta import relativedelta
start_date = datetime.date(2015,6,26)
end_date = start_date + relativedelta(years=4)
# 日付範囲から日付リストを作成する関数を呼び出し
day_list = make_day_list(start_date, end_date)
この中から、現在も閲覧できる状態にあり、銘柄コードが存在しているものだけを抽出します。
閲覧できなくなっているものlegalStatus=0
となります
securities_report_data = make_doc_id_list(2024/12/23)
# 取得した企業情報をデータフレームに格納
df = pd.DataFrame(securities_report_data)
print(date,len(df))
if len(df) == 0:
continue
df = df.sort_values('seqNumber')
df_ex = df[(~(df['secCode'].isnull())) & (df['legalStatus']!='0') & (df['xbrlFlag']=='1')]
対象有価証券報告書のダウンロード
対象となる有価証券報告書をダウンロードしてzipを解凍します
def download_zip_file(doc_id):
url = f"https://disclosure.edinet-fsa.go.jp/api/v2/documents/{doc_id}"
params = {"type": 1,"Subscription-Key":api_key}
filename = f"{doc_id}.zip"
# 解凍先のディレクトリのパス
extract_dir = filename.replace('.zip','')
res = requests.get(url, params=params, stream=True)
if res.status_code == 200:
with open(filename, 'wb') as file:
for chunk in res.iter_content(chunk_size=1024):
file.write(chunk)
# print(f"ZIP ファイル {filename} のダウンロードが完了しました。")
# ZipFileオブジェクトを作成
zip_obj = zipfile.ZipFile(filename, 'r')
# 解凍先ディレクトリを作成(存在しない場合)
if not os.path.exists(extract_dir):
os.makedirs(extract_dir)
# 全てのファイルを解凍
zip_obj.extractall(extract_dir)
# ZipFileオブジェクトをクローズ
zip_obj.close()
# print(f"ZIP ファイル {filename} を解凍しました。")
else:
print("ZIP ファイルのダウンロードに失敗しました。")
Arelleを使って解析をするのでファイル全体を読み込みます。
download_zip_file(doc_id)
xbrl_file = glob.glob(f'{doc_id}/XBRL/PublicDoc/*.xbrl')[0]
cols = ['Concept','Facts']
ctrl = Cntlr.Cntlr(logFileName='logToPrint')
modelXbrl = ctrl.modelManager.load(xbrl_file)
ここまでの内容はすでに別の記事などでも紹介している内容となりますがここから先は少し複雑になります。
データの整形
します有価証券報告書をArelleで読み込んだら必要な情報だけを取り出していきます。そのためには、ArelleのViewFileFactTableをベースとして独自で作成したクラスを使います。
from arelle.XbrlConst import conceptNameLabelRole, standardLabel, terseLabel, documentationLabel
class MyViewFacts(ViewFileFactTable.ViewFacts):
def __init__(self, modelXbrl, outfile, arcrole, linkrole, linkqname, arcqname, ignoreDims, showDimDefaults, labelrole, lang, cols,col_num=1,label_cell=None):
super().__init__(modelXbrl, outfile, arcrole, linkrole, linkqname, arcqname, ignoreDims, showDimDefaults, labelrole, lang, cols)
self.data = []
def viewConcept(self, concept, modelObject, labelPrefix, preferredLabel, n, relationshipSet, visited):
# bad relationship could identify non-concept or be None
if (not isinstance(concept, ModelDtsObject.ModelConcept) or
concept.substitutionGroupQname == XbrlConst.qnXbrldtDimensionItem):
return
cols = ['' for i in range(self.numCols)]
i = 0
for col in self.cols:
if col == "Facts":
self.setRowFacts(cols,concept,preferredLabel)
i = self.numCols - (len(self.cols) - i - 1) # skip to next concept property column
else:
if col in ("Concept", "Label"):
cols[i] = labelPrefix + concept.label(preferredLabel,lang=self.lang,linkroleHint=relationshipSet.linkrole)
i += 1
attr = {"concept": str(concept.qname)}
self.addRow(cols, treeIndent=n,
xmlRowElementName="facts", xmlRowEltAttr=attr, xmlCol0skipElt=True)
self.add_content(concept, modelObject)
if concept not in visited:
visited.add(concept)
for i, modelRel in enumerate(relationshipSet.fromModelObject(concept)):
nestedRelationshipSet = relationshipSet
targetRole = modelRel.targetRole
if self.arcrole in XbrlConst.summationItems:
childPrefix = "({:0g}) ".format(modelRel.weight) # format without .0 on integer weights
elif targetRole is None or len(targetRole) == 0:
targetRole = relationshipSet.linkrole
childPrefix = ""
else:
nestedRelationshipSet = self.modelXbrl.relationshipSet(self.arcrole, targetRole, self.linkqname, self.arcqname)
childPrefix = "(via targetRole) "
toConcept = modelRel.toModelObject
if toConcept in visited:
childPrefix += "(loop)"
labelrole = modelRel.preferredLabel
if not labelrole or self.labelrole == conceptNameLabelRole:
labelrole = self.labelrole
self.viewConcept(toConcept, modelRel, childPrefix, labelrole, n + 1, nestedRelationshipSet, visited)
visited.remove(concept)
def add_content(self, concept, modelObject):
if concept.isNumeric:
label = concept.label(lang='ja')
s_label = concept.label(preferredLabel=standardLabel, lang='ja')
facts = self.conceptFacts[concept.qname]
if isinstance(modelObject, ModelDtsObject.ModelRelationship):
parent_name = modelObject.fromModelObject.qname
parent_label = modelObject.fromModelObject.label(preferredLabel=standardLabel, lang='ja')
else:
parent_name = None
parent_label = None
if isinstance(modelObject, str):
link_def = self.linkRoleDefintions[modelObject]
elif isinstance(modelObject, ModelDtsObject.ModelRelationship):
link_def = self.linkRoleDefintions[modelObject.linkrole]
for f in facts:
if f.unit is not None:
unit = f.unit.value
else:
unit = None
value = f.xValue
context = f.context
self.data.append([concept.qname,concept.typeQname,concept.name, label, s_label, parent_name, parent_label,value, context.startDatetime, context.endDatetime, unit, link_def,context.id])
def viewFacts(modelXbrl, outfile, arcrole=None, linkrole=None, linkqname=None, arcqname=None, ignoreDims=False, showDimDefaults=False, labelrole=None, lang=None, cols=None,col_num=1, label_cell=None):
if not arcrole: arcrole=XbrlConst.parentChild
view = MyViewFacts(modelXbrl, outfile, arcrole, linkrole, linkqname, arcqname, ignoreDims, showDimDefaults, labelrole, lang, cols,col_num, label_cell)
view.view(modelXbrl.modelDocument)
df = pd.DataFrame(view.data, columns=['Name','Type','LocalName','Label','StandardLabel','ParentName','ParentLabel', 'Value','StartDate','EndDate','Unit','LinkDefinition','ContextID'])
view.close()
return pd.DataFrame(df)
このクラスを用いることで、各要素の親子関係を反映したデータを取得することができます。
if concept.isNumeric:
とすることによって、数値データのみを保存するようにします。テキストデータも含めて保存したい場合はここは削除します。
ある要素(fact)がどういうものであるのか認識するには、qname(concept情報も含む)とcontext情報が必要になります。
conceptにはラベルなどの情報が入っており、contextにはいつのデータかといった情報が入っています。
従って最低でもCSV に記録する上では、qname,context, valueを保存する必要があります。
実際の利用の上では、ラベル、date, valueあたりが必要になってくると思います。
ただし日本語のラベルは重複する可能性があるので、それらを処理する上では、親要素のラベルも必要になる場合があります。(例えば、その他という項目に対して親要素が営業費用なのか営業利益なのかで意味が変わってくるので、ラベルのみを使う場合には、親要素も記録しておく必要があるということです)
CSV への保存
保存容量の観点から考えると、なるべく保存するデータは少なくしたいです。
そこで、今回は、複数の決算報告書で共通すると考えられるconcept関連のデータについては(つまり、ラベル情報や親要素情報)別途辞書として保存し、それ以外の、qname, value, startdate, enddate, contextid部分とdocIDをCSV に記録していきます。
conceptの保存のための関数は以下になります。
def make_dict(df, concept_dict=None):
if concept_dict is None:
# defaultdictの初期化
concept_dict = defaultdict(lambda: {'StandardLabel': None, 'ParentName': None, 'ParentLabel':None})
# dfのデータをdefaultdictに登録
for _, row in df.iterrows():
name = str(row['Name'])
concept_dict[name]['StandardLabel'] = row['StandardLabel']
concept_dict[name]['ParentName'] = str(row['ParentName'])
concept_dict[name]['ParentLabel'] = row['ParentLabel']
else:
# dfのデータをチェックして追加
for idx, row in df.iterrows():
name = str(row['Name'])
std_label = row['StandardLabel']
parent_name = str(row['ParentName'])
name_to_check = name
name_conflict = False
if name_to_check in concept_dict:
if (concept_dict[name_to_check]['StandardLabel'] == std_label and
concept_dict[name_to_check]['ParentName'] == parent_name):
# 名前が既に存在し、StandardLabelとParentNameが同じ場合
name_conflict = False
else:
# 名前が既に存在し、StandardLabelまたはParentNameが異なる場合
name_conflict = True
name_to_check = name + '_new'
count = 1
while name_to_check in concept_dict:
if (concept_dict[name_to_check]['StandardLabel'] == std_label and
concept_dict[name_to_check]['ParentName'] == parent_name):
# 名前が既に存在し、StandardLabelとParentNameが同じ場合
name_conflict = False
break
else:
name_to_check = f"{name}_new{count}"
count += 1
if name_conflict:
# 新しい名前を登録
concept_dict[name_to_check]['StandardLabel'] = std_label
concept_dict[name_to_check]['ParentName'] = parent_name
concept_dict[name_to_check]['ParentLabel'] = row['ParentLabel']
df.loc[idx, 'Name'] = name_to_check
else:
# 名前が既に存在し、StandardLabelとParentNameが同じ場合
concept_dict[name]['StandardLabel'] = std_label
concept_dict[name]['ParentName'] = parent_name
concept_dict[name]['ParentLabel'] = row['ParentLabel']
return concept_dict, df
concept_file_path = '/content/drive/MyDrive/EDINET/concept_dict.pkl'
def load_concept():
if os.path.exists(concept_file_path)==False:
# 新しい辞書を作成
concept_dict = defaultdict(lambda: {'StandardLabel': None, 'ParentName': None, 'ParentLabel':None})
# ファイルから読み込む
else:
with open(concept_file_path, 'rb') as file:
loaded_normal_dict = pickle.load(file)
# 普通の辞書をdefaultdictに変換
concept_dict = defaultdict(lambda: {'StandardLabel': None, 'ParentName': None, 'ParentLabel':None}, loaded_normal_dict)
return concept_dict
def save_concept(concept_dict):
# # defaultdictを普通の辞書に変換
normal_dict = dict(concept_dict)
with open(concept_file_path, 'wb') as file:
pickle.dump(normal_dict, file)
concept_dict = load_concept()
また、ドキュメント情報も参照できるようにするためメタデータとして書類一覧の取得結果も保存します。
meta_cols = ['date','seqNumber', 'docID', 'edinetCode', 'secCode', 'JCN', 'filerName', 'fundCode', 'ordinanceCode', 'formCode', 'docTypeCode', 'periodStart', 'periodEnd', 'submitDateTime']
def make_metafile(meta_csv_file):
if os.path.exists(meta_csv_file)==False:
df_header = pd.DataFrame(columns=meta_cols)
df_header.to_csv(meta_csv_file, index=False)
date_prev = None
seq_num = 0
else:
df_meta = pd.read_csv(meta_csv_file)
if len(df_meta) > 0:
seq_num = df_meta.iloc[-1,:]['seqNumber']
date_prev = df_meta.iloc[-1,:]['date']
date_prev = datetime.datetime.strptime(date_prev, '%Y-%m-%d')
else:
date_prev = None
seq_num = 0
return date_prev, seq_num
途中で実行が停止してしまった場合でも、続きからダウンロードを開始できるように、最も最後に読み込まれているデータを確認して、それ以降のデータを保存するようにしておきます。
今回は Google Colabを使うので、Google ドライブにマウントして、Google ドライブ内に CSV を保存する形にします。
マイドライブの下にEDINETというフォルダーと、その下に、metaとmainというフォルダが存在する前提となっています。
今回は、1日ごとに1ファイルを作成することとしました。最後に年ごと程度にまとめてもいいかもしれません。
これらを踏まえた、実際の一連の流れを示したコードが以下になります。
# 日付範囲
from dateutil.relativedelta import relativedelta
start_date = datetime.date(2015,6,26)
end_date = start_date + relativedelta(years=4)
# 日付範囲から日付リストを作成する関数を呼び出し
day_list = make_day_list(start_date, end_date)
for date in day_list:
folder_year = date.year
meta_csv_file = f'/content/drive/MyDrive/EDINET/meta/{folder_year}.csv'
date_prev, seq_num = make_metafile(meta_csv_file)
if date_prev is not None:
if date_prev.date()>date:
continue
if os.path.exists(f'/content/drive/MyDrive/EDINET/main/{folder_year}')==False:
os.mkdir(f'/content/drive/MyDrive/EDINET/main/{folder_year}')
# 日付リストを利用してEDINETから企業情報を取得する関数を呼び出し
securities_report_data = make_doc_id_list(2024/12/23)
# 取得した企業情報をデータフレームに格納
df = pd.DataFrame(securities_report_data)
print(date,len(df))
if len(df) == 0:
continue
df = df.sort_values('seqNumber')
df_ex = df[(~(df['secCode'].isnull())) & (df['legalStatus']!='0') & (df['xbrlFlag']=='1')]
name = date.strftime('%Y%m%d')
if len(df_ex) > 0:
csv_file = f'/content/drive/MyDrive/EDINET/main/{folder_year}/{name}.csv'
if os.path.exists(csv_file)==False:
header = ['doc_id', 'Name', 'Value', 'StartDate', 'EndDate', 'Unit', 'ContextID']
df_header = pd.DataFrame(columns=header)
df_header.to_csv(csv_file, index=False)
counter = 0
for _, row in df_ex.iterrows():
if date_prev is not None:
if date_prev.date()==date and row['seqNumber'] <= seq_num:
continue
doc_id = row['docID']
download_zip_file(doc_id)
xbrl_file = glob.glob(f'{doc_id}/XBRL/PublicDoc/*.xbrl')[0]
cols = ['Concept','Facts']
ctrl = Cntlr.Cntlr(logFileName='logToPrint')
modelXbrl = ctrl.modelManager.load(xbrl_file)
data = viewFacts(modelXbrl, 'test.csv', cols=cols, lang='ja')
data = parse_df(data)
concept_dict,data = make_dict(data, concept_dict)
data['doc_id'] = doc_id
data[['doc_id','Name','Value','StartDate','EndDate','Unit','ContextID']].to_csv(csv_file, mode='a', index=False, header=False)
row['date'] = date.strftime('%Y-%m-%d')
pd.DataFrame(row).T.loc[:,meta_cols].to_csv(meta_csv_file, mode='a', index=False, header=False)
save_concept(concept_dict)
del_files(doc_id)
counter += 1
print(len(df_ex),counter)
まとめ
今回は、これまでのEDINETに関する知識を総動員して、独自の財務データベースを作成しました。
すべてのデータを保存しようとすると容量の観点で難しいですが、うまく必要なデータだけ抽出して、重複をなるべく減らすように工夫することで、個人でも保存できる程度のデータ容量に収まります。
これらのコードをAWS Lambdaなどに登録することで、定期自動実行で、ほぼメンテナンスフリーで財務データを収集できます。
今回使用したコードはこちらのGoogleColabから確認できます。