Pythonでニコ生アラートを作る(放送通知サーバをクラス化する)

Python

前回までのスクリプトでニコ生の放送通知を受け取ることが出来たので、今回はもう少し使いやすくするためにクラス化したいと思います。

クラス化にあたっては、放送通知サーバの稼動状態によって動作を変えたいので、GoFデザインパターンのStateパターンを意識しています。

エラー処理はほとんど未実装なので実際はもう少し手をいれる必要があります。

まずは放送通知サーバです。

# -*- coding: utf-8 -*-

'''ニコ生の放送通知サーバクラス'''

from Socket import Socket
from urllib2 import urlopen
import xml.etree.ElementTree as Xml

from AlertState import Suspend


class AlertServer:
	def __init__(self):
		self.socket = Socket(self)
		self.state  = Suspend()

		# サーバステータス
		self.status_dict = {
			'status': None,
			'code': None
		}

		self._url       = 'http://live.nicovideo.jp/api/getalertinfo'
		self._show_data = False

		self.check_state()

	def check_state(self):
		'''放送通知サーバの状態チェック'''
		xml = self._get_xml_from_url(url=self._url)
		self._set_socket_data(xml=xml)

		self.state.check_state(server=self)

	def change_state(self, state):
		pre_state = self.state.__class__.__name__
		now_state = state.__class__.__name__

		log_string  = 'Server state changed from ' + pre_state
		log_string += ' to ' + now_state + '.'
		print log_string

		self.state = state
		self.connect()

	def connect(self):
		self.state.connect(server=self)

	def disconnect(self):
		self.state.disconnect(server=self)

	def show(self):
		print 'Display received data.'
		self._show_data = True

	def hide(self):
		print 'Hide received data.'
		self._show_data = False

	def alert_event(self, alert_event):
		'''放送通知受信時の処理'''
		if self._show_data:
			print 'Alert received:' + str(alert_event)

	def data_received_event(self, event):
		'''放送通知以外のデータ受信時'''
		if self._show_data:
			print 'Data received:' + event

	def _get_xml_from_url(self, url):
		'''URLからデータを取得してXML要素オブジェクトにする'''
		response = urlopen(url)
		xml_string = response.read()
		xml = Xml.fromstring(xml_string)
		return xml

	def _set_socket_data(self, xml):
		'''XML要素オブジェクトから必要な情報をセット'''
		status = xml.get('status')

		self.status_dict['status'] = status

		if status == 'ok':		# 放送中
			self.socket.address = xml.findtext('ms/addr')
			self.socket.port    = xml.findtext('ms/port')
			self.socket.thread  = xml.findtext('ms/thread')
		elif status == 'fail':
			code = xml.findtext('.//code')
			self.status_dict['code'] = code

前回までのスクリプトとの違いは、socketが受信した放送通知とそれ以外のデータをコチラで受け取って処理するためのメソッド(?)を追加しています。

今はprint文で画面表示してるだけですが、ニコ生アラートを作るときにはココをオーバーライドして放送通知対象との突き合わせを行えば良さそうです。

次は放送通知サーバの状態を表すクラスです。

# -*- coding: utf-8 -*-

from threading import Timer


class AlertState:
	'''放送通知サーバの状態を表すスーパークラス'''
	def check_state(self, server):
		"""
		status	= 'ok'     : 配信中
				= それ以外 : 停止中"""
		if server.status_dict['status'] == 'ok':
			server.change_state(Working())
		else:
			server.change_state(Suspend())

	def connect(self, server):
		raise NotImplementedError

	def disconnect(self, server):
		raise NotImplementedError


class Working(AlertState):
	'''サーバ稼働中クラス'''

	def connect(self, server):
		server.socket.connect()

	def disconnect(self, server):
		server.socket.disconnect()


class Suspend(AlertState):
	'''サーバ停止中クラス'''
	def __init__(self):
		self._retry_thread = None

	def connect(self, server):
		'''180秒後に再接続'''
		self._retry_thread = Timer(180, server.check_state)
		self._retry_thread.start()

		output_message = 'Connect after 180 seconds.'
		print output_message

	def disconnect(self, server):
		'''再接続スレッドを停止'''
		self._retry_thread.cancel()
		print 'Retry canceled.'

pythonの動的型付け(ダック・タイピング)を考えるとちょっと無駄なコトしてますが、一応読みやすさを考慮してこんな感じにしています。

放送通知サーバが停止中の場合は180秒後に再接続するようにしています(39行目)。

最後に放送通知サーバへのソケット接続を行うクラスです。

# -*- coding: utf-8 -*-

from threading import Thread, Event
import socket

from SocketEvent import EventSelector


class Socket(object):
	'''Socket通信クラス'''
	def __init__(self, sender):
		self._server = sender
		self.address = None
		self.port    = None
		self.thread  = None

		self._socket  = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

		self._receive_thread    = None
		# 受信スレッド停止用
		self._thread_stop_event = Event()

	def connect(self):
		self._socket.connect((self.address, int(self.port)))
		# リクエスト用のメッセージ
		post_message = '<thread thread="{0}" version="20061206" res_from="-1"/>\0'
		post_message = post_message.format(self.thread)

		self.send(post_message)

	def disconnect(self):
		'''受信スレッドを停止'''
		try:
			# スレッドが停止するのを待つ
			self._thread_stop_event.set()
			self.receive_thread.join()

			self._socket.close()

			log_string = 'Thread stopped.'
			print log_string
		except Exception as e:
			log_string  = 'type: ' + str(type(e))
			log_string += 'message: ' + e.message
			print log_string

	def send(self, post_message):
		total_sent = 0
		while total_sent < len(post_message):
			sent = self._socket.send(post_message[total_sent:])
			if sent == 0:
				raise RuntimeError('socket connection broken')
			total_sent += sent

		# メッセージ受信スレッド
		self.receive_thread = Thread(target=self._data_receive)
		self.receive_thread.start()

	def _data_receive(self):
		"""メッセージ受信
		外部イベントを受け取ることで停止"""
		while not self._thread_stop_event.is_set():
			# 上限4KBで受信
			recv_stream = self._socket.recv(4096)
			if recv_stream == '':
				# 受信データが無いとき
				raise RuntimeError('socket connection broken')
			else:
				# 1件ごとに切り出し
				receive_data_list = recv_stream.split('\0')
				for xml_string in receive_data_list:
					if len(xml_string) != 0:
						# イベント発行
						eventor = EventSelector(xml_string)
						eventor.make_event(self)

	def alert_event(self, alert_dict):
		self._server.alert_event(alert_dict)

	def thread_result_event(self, result_string):
		self._server.data_received_event(result_string)

前回までのスクリプトとの違いは

  1. 無限ループだったデータ受信部分を別スレッドにしたこと(56行目)
  2. データ受信部分のスレッドを外部から停止出来るようにしたこと(62行目)
  3. 受信データによって違った処理を行えるようにしたこと(74行目)

くらいでしょうか。

2.のスレッドを外部から停止する方法についてはコチラの記事を参考にさせて頂きました。

3.の部分については、XMLデータの要素毎に処理を切り替えるクラスを作り、その結果をsocketに返すようにイベントチックな感じにしました。

この辺はダック・タイピングで楽させて貰ってますw

alerteventメソッド、threadresult_eventメソッドを更にサーバに送ってるんですが、この辺はもっと綺麗にした方が良いかもですね。

XMLデータを要素ごとに切り替えるクラスはこんな感じにしています。

# -*- coding: utf-8 -*-

import xml.etree.ElementTree as Xml


class EventSelector(object):
	'''Socket受信データを振り分けるクラス'''
	def __init__(self, xml_string):
		self._xml     = Xml.fromstring(xml_string)
		self._eventor = None

		self.select_eventor()

	def select_eventor(self):
		if self._xml.tag == 'thread':
			self._eventor = EventThread(self._xml)
		elif self._xml.tag == 'chat':
			self._eventor = EventChat(self._xml)

	def make_event(self, socket):
		self._eventor.make_event(socket)


class EventChat(object):
	'''<chat thread= を処理するクラス'''
	def __init__(self, xml):
		self._xml = xml

	def make_event(self, socket):
		'''放送開始イベントを発行'''
		alert_string = self._xml.text
		alert_events = alert_string.split(',')
		try:
			alert_dict  = {
				'live_id': 'lv' + alert_events[0],
				'community_id': alert_events[1],
				'user_id': alert_events[2]
			}
		except IndexError:
			error_msg  = 'IndexError:\n'
			error_msg += 'reveive data: '
			error_msg += self._xml.text
			print error_msg
			pass

		socket.alert_event(alert_dict)


class EventThread(object):
	'''<thread resultcode= を処理するクラス'''
	def __init__(self, xml):
		self._xml = xml

	def make_event(self, socket):
		result_code = self._xml.get('resultcode')
		if result_code == '0':
			result_name = 'FOUND'
		elif result_code == '1':
			result_name = 'NOT_FOUND'
		elif result_code == '2':
			result_name = 'INVALID'
		elif result_code == '4':
			result_name = 'INVALID'
		elif result_code == '5':
			result_name = 'TOO_ODD_WAYBACKKEY'
		elif result_code == '4':
			result_name = 'INVALID_WAYBACKKEY'
		elif result_code == '6':
			result_name = 'INVALID_ADMINKEY'
		elif result_code == '7':
			result_name = 'TOO_ODD_ADMINKEY'
		elif result_code == '8':
			result_name = 'INVALID_THREADKEY'
		elif result_code == '9':
			result_name = 'TOO_ODD_THREADKEY'
		elif result_code == '10':
			result_name = 'NOT_IMPLEMENTED'
		elif result_code == '11' or result_code == '12':
			result_name = 'LEAF_NOT_ACTIVATE'
		elif result_code == '13':
			result_name = 'LANGUAGE_NOT_FOUND'
		else:
			result_name = ''
		receive_result = result_name + '(resultcode: ' + result_code + ')'
		# イベント発行
		socket.thread_result_event(receive_result)

もう少しスマートするべきなんですけど、この辺は面倒になってかなり手抜いてます…w


実際はエラー処理や放送通知以外の受信データも整形して処理した方が良いと思うのでもう少し手を入れる必要がありますが、一応これでも放送通知を受け取ることは出来ました。

次回は放送通知と通知対象との突き合わせを行う部分に移ります。

オブジェクト指向における再利用のためのデザインパターン改訂版 [ エリック・ガンマ ]
created by Rinker

コメント

タイトルとURLをコピーしました