python-telegram-bot/telegram/inputfile.py

195 lines
5.9 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# pylint: disable=W0622,E0611
2015-08-11 21:58:17 +02:00
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2016
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
2015-08-11 21:58:17 +02:00
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
2015-08-10 18:57:31 +02:00
"""This module contains a object that represents a Telegram InputFile"""
2015-08-10 18:57:31 +02:00
try:
from email.generator import _make_boundary as choose_boundary
from urllib.request import urlopen
from io import BufferedReader as file
except ImportError:
from mimetools import choose_boundary
from urllib2 import urlopen
import mimetypes
import os
import sys
import imghdr
from telegram import TelegramError
DEFAULT_MIME_TYPE = 'application/octet-stream'
USER_AGENT = 'Python Telegram Bot' \
2015-12-15 00:51:17 +01:00
' (https://github.com/python-telegram-bot/python-telegram-bot)'
class InputFile(object):
"""This object represents a Telegram InputFile."""
def __init__(self,
data):
self.data = data
self.boundary = choose_boundary()
2015-07-11 00:46:15 +02:00
if 'audio' in data:
self.input_name = 'audio'
self.input_file = data.pop('audio')
2015-07-11 00:46:15 +02:00
if 'document' in data:
self.input_name = 'document'
self.input_file = data.pop('document')
2015-07-11 00:46:15 +02:00
if 'photo' in data:
self.input_name = 'photo'
self.input_file = data.pop('photo')
2015-09-09 14:28:58 +02:00
if 'sticker' in data:
self.input_name = 'sticker'
self.input_file = data.pop('sticker')
2015-07-11 00:46:15 +02:00
if 'video' in data:
self.input_name = 'video'
self.input_file = data.pop('video')
if 'voice' in data:
self.input_name = 'voice'
self.input_file = data.pop('voice')
if 'certificate' in data:
self.input_name = 'certificate'
self.input_file = data.pop('certificate')
2015-07-11 00:46:15 +02:00
if isinstance(self.input_file, file):
self.input_file_content = self.input_file.read()
2015-09-08 20:52:10 +02:00
if 'filename' in data:
self.filename = self.data.pop('filename')
else:
self.filename = os.path.basename(self.input_file.name)
2015-07-13 23:53:30 +02:00
self.mimetype = mimetypes.guess_type(self.filename)[0] or \
DEFAULT_MIME_TYPE
2015-07-11 00:46:15 +02:00
if 'http' in self.input_file:
self.input_file_content = urlopen(self.input_file).read()
self.mimetype = InputFile.is_image(self.input_file_content)
2015-07-13 23:53:30 +02:00
self.filename = self.mimetype.replace('/', '.')
@property
def headers(self):
"""
Returns:
str:
"""
return {'User-agent': USER_AGENT,
'Content-type': self.content_type}
@property
def content_type(self):
"""
Returns:
str:
"""
return 'multipart/form-data; boundary=%s' % self.boundary
def to_form(self):
"""
Returns:
str:
"""
form = []
form_boundary = '--' + self.boundary
# Add data fields
for name, value in self.data.items():
form.extend([
form_boundary,
'Content-Disposition: form-data; name="%s"' % name,
'',
str(value)
])
# Add input_file to upload
form.extend([
form_boundary,
'Content-Disposition: form-data; name="%s"; filename="%s"' % (
self.input_name, self.filename
),
'Content-Type: %s' % self.mimetype,
'',
self.input_file_content
])
form.append('--' + self.boundary + '--')
form.append('')
return InputFile._parse(form)
@staticmethod
def _parse(form):
"""
Returns:
str:
"""
if sys.version_info > (3,):
# on Python 3 form needs to be byte encoded
encoded_form = []
for item in form:
try:
encoded_form.append(item.encode())
except AttributeError:
encoded_form.append(item)
return b'\r\n'.join(encoded_form)
return '\r\n'.join(form)
2015-07-13 23:53:30 +02:00
@staticmethod
def is_image(stream):
"""Check if the content file is an image by analyzing its headers.
Args:
stream (str): A str representing the content of a file.
Returns:
str: The str mimetype of an image.
"""
image = imghdr.what(None, stream)
if image:
return 'image/%s' % image
raise TelegramError('Could not parse file content')
@staticmethod
def is_inputfile(data):
"""Check if the request is a file request.
Args:
data (str): A dict of (str, unicode) key/value pairs
Returns:
bool
"""
if data:
file_types = ['audio', 'document', 'photo', 'sticker', 'video',
2015-09-09 14:28:58 +02:00
'voice', 'certificate']
file_type = [i for i in list(data.keys()) if i in file_types]
if file_type:
file_content = data[file_type[0]]
2015-07-13 23:53:30 +02:00
2015-07-20 03:05:22 +02:00
if file_type[0] == 'photo' or file_type[0] == 'document':
return isinstance(file_content, file) or \
str(file_content).startswith('http')
2015-07-13 23:53:30 +02:00
return isinstance(file_content, file)
2015-07-13 23:53:30 +02:00
return False