Hoy en día, la data está creciendo y se acumula más rápido que antes. Actualmente, alrededor del 90% de la data generada en nuestro mundo fue generada en los dos últimos años. Debido a este crecimiento en tasa, las plataformas big data tuvieron que adoptar soluciones radicales para poder mantener volúmenes tan grandes de data.
Una de las fuentes de data más significativas hoy en día son las redes sociales. Permíteme demostrar un ejemplo de la vida real: manejando, analizando y extrayendo información de la data de las redes sociales en tiempo real, usando una de las soluciones eco en big data más importantes que existen—Apache Spark y Python.
En este artículo, te enseñaré cómo construir una aplicación simple que lee transmisiones online de Twitter usando Python, luego procesa los tweets usando Apache Spark Streaming para identificar los hashtags y, finalmente, regresa los hashtags en tendencia más importantes y representa esta data en el panel de control en tiempo real.
Creando Tus Propias Credenciales para las API de Twitter
Para poder obtener tweets de Twitter, debes registrarte en TwitterApps dándole clic en “Crea una nueva aplicación” y luego de llenar la planilla que se encuentra abajo, da clic en “Crea tu aplicación Twitter.”
Segundo, dirígete a tu aplicación recién creada y abre la ventana “Identificadores de Acceso y Claves”. Luego da clic en “Generar mi identificador de acceso”.
Tus nuevos identificadores de acceso aparecerán como se ven abajo.
Y ahora estás listo para el próximo paso.
Construir la HTTP Cliente de Twitter
En este paso, te mostraré cómo construir un cliente simple que obtendrá los tweets de la API de Twitter usando Python y luego éste las pasa a la instancia Spark Streaming. Debería ser fácil de seguir para cualquier desarrollador Python profesional.
Primero, vamos a crear un archivo llamado twitter_app.py
y luego vamos a agregar el código juntos como se ve abajo.
Importa las bibliotecas que vamos a usar como se ve abajo:
import socket
import sys
import requests
import requests_oauthlib
import json
Y agrega las variables que serán usadas en OAuth para conectarse con Twitter como se ve abajo:
# Reemplaza los valores de abajo con los tuyos
ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'
ACCESS_SECRET = 'YOUR_ACCESS_SECRET'
CONSUMER_KEY = 'YOUR_CONSUMER_KEY'
CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET'
my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
Ahora, vamos a crear una nueva función llamada get_tweets
que llamará la URL API de Twitter y regresará la respuesta para una cadena de tweets.
def get_tweets():
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')]
query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
response = requests.get(query_url, auth=my_auth, stream=True)
print(query_url, response)
return response
Luego, crea una función que toma la respuesta de la vista arriba y extrae el texto de los tweets del objeto JSON de los tweets completos. Después de esto, envía cada tweet a la instancia Spark Streaming (se discutirá más adelante) a través de una conexión TCP.
def send_tweets_to_spark(http_resp, tcp_connection):
for line in http_resp.iter_lines():
try:
full_tweet = json.loads(line)
tweet_text = full_tweet['text']
print("Tweet Text: " + tweet_text)
print ("------------------------------------------")
tcp_connection.send(tweet_text + '\n')
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
Ahora haremos la parte principal. Esta hará que la aplicación aloje las conexiones socket, con las que luego se conectará Spark. Vamos a configurar la IP aquí para que sea localhost
ya que todo se ejecutará en la misma máquina y en el puerto 9009
. Luego, vamos a llamar al método get_tweets
, el cual hicimos arriba, para obtener los tweets de Twitter y pasar su respuesta con la conexión socket a send_tweets_to_spark
para enviar los tweets a Spark.
TCP_IP = "localhost"
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
send_tweets_to_spark(resp, conn)
Instalando Nuestra Aplicación Apache Spark Streaming
Vamos a construir nuestra aplicación Spark Streaming, la cual hará procesamiento en tiempo real para los tweets entrantes, extraerá hashtags de estos y calculará cuantos hashtags han sido mencionados.
Primero, tenemos que crear una instancia Spark Context sc
, luego creamos Streaming Context ssc
de sc
con un intervalo de dos segundos que realizará la transformación en todas las transmisiones recibidas cada dos segundos. Nota que fijamos el nivel de registro a ERROR
para poder inhabilitar la mayoría de los registros que escribe Spark.
Definimos un punto de control aquí para poder permitir un control periódico RDD; esto es obligatorio que sea utilizado en nuestra aplicación ya que usaremos transformaciones de contra fuegos con estado (será discutido más adelante en la misma sección).
Luego definimos nuestro DStream dataStream principal, el cual conectará el servidor socket que creamos anteriormente en el puerto 9009
y leerá los tweets desde ese puerto. Cada récord en el DStream será un tweet.
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
# crea una configuración spark
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
# crea un contexto spark con la configuración anterior
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos
ssc = StreamingContext(sc, 2)
# establece un punto de control para permitir la recuperación de RDD
ssc.checkpoint("checkpoint_TwitterApp")
# lee data del puerto 9009
dataStream = ssc.socketTextStream("localhost",9009)
Ahora, vamos a definir nuestra lógica de transformación. Primero, vamos a dividir todos los tweets en palabras y los colocamos en palabras RDD. Luego, pasamos por un filtro solo hashtags de todas las palabras y los trazamos a la par del (hashtag, 1)
y los ponemos en hashtags RDD.
Luego debemos calcular cuantas veces se ha mencionado el hashtag. Podemos hacer eso usando la función reduceByKey
. Esta función calculará cuántas veces se ha mencionado el hashtag por cada grupo, es decir, va a resetear la cuenta en cada grupo.
En nuestro caso, necesitamos calcular las cuentas en todos los grupos, así que usaremos otra función llamada updateStateByKey
ya que esta función te permite mantener el estado de RDD mientras que lo actualiza con una nueva data. Esta forma se llama Stateful Transformation
.
Nota que para poder usar updateStateByKey
, debes configurar un punto de control y lo que se hizo en el paso anterior.
# divide cada Tweet en palabras
words = dataStream.flatMap(lambda line: line.split(" "))
# filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# agrega la cuenta de cada hashtag a su última cuenta
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
# procesa cada RDD generado en cada intervalo
tags_totals.foreachRDD(process_rdd)
# comienza la computación de streaming
ssc.start()
# espera que la transmisión termine
ssc.awaitTermination()
updateStateByKey
toma una función como un parámetro llamado la función update
. Ésta se ejecuta en cada ítem en RDD y realiza la lógica deseada.
En nuestro caso, hemos creado una función de actualización llamada aggregate_tags_count
que sumará todos los new_values
(nuevos valores) para cada hashtag y los agregará a la total_sum
(suma total), que es la suma de todos los grupos y guarda la data en RDD tags_totals
.
def aggregate_tags_count(new_values, total_sum):
return sum(new_values) + (total_sum or 0)
Después, hacemos procesamiento en RDD tags_totals
en cada grupo para poder convertirlo en tabla temporal usando Spark SQL Context y luego de esto, realizar una declaración para poder tomar los diez mejores hashtags con sus cuentas y ponerlos dentro del marco de data hashtag_counts_df
.
def get_sql_context_instance(spark_context):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
return globals()['sqlContextSingletonInstance']
def process_rdd(time, rdd):
print("----------- %s -----------" % str(time))
try:
# obtén el contexto spark sql singleton desde el contexto actual
sql_context = get_sql_context_instance(rdd.context)
# convierte el RDD a Row RDD
row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
# crea un DF desde el Row RDD
hashtags_df = sql_context.createDataFrame(row_rdd)
# Registra el marco de data como tabla
hashtags_df.registerTempTable("hashtags")
# obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos
hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
hashtag_counts_df.show()
# llama a este método para preparar los 10 mejores hashtags DF y envíalos
send_df_to_dashboard(hashtag_counts_df)
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
El último paso en nuestra aplicación Spark es enviar el marco de data hashtag_counts_df
a la aplicación de panel de control. Así, convertiremos el marco de data en dos matrices, una para los hashtags y otra para sus cuentas. Luego, enviaremos a la aplicación de panel de control a través de la API REST.
def send_df_to_dashboard(df):
# extrae los hashtags del marco de data y conviértelos en una matriz
top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
# extrae las cuentas del marco de data y conviértelos en una matriz
tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
# inicia y envía la data a través de la API REST
url = 'http://localhost:5001/updateData'
request_data = {'label': str(top_tags), 'data': str(tags_count)}
response = requests.post(url, data=request_data)
Finalmente, aquí se ve una muestra de la salida de Spark Streaming mientras se ejecuta e imprime el hashtag_counts_df
. Notarás que la salida se imprime exactamente cada dos segundos por cada intervalo de grupo.
Crea un Panel de Control Simple de Tiempo Real para Representar la Data
Ahora, vamos a crear una aplicación de panel de control simple que se actualizará en tiempo real por Spark. La vamos a construir usando Python, Flask y Charts.js.
Primero, vamos a crear un proyecto Python con la estructura vista abajo, descarga y agrega el archivo Chart.js en el directorio estático.
Luego, en el archivo app.py
, vamos a crear una función llamada update_data
, la cual será llamada por Spark a través de la URL http://localhost:5001/updateData
para poder actualizar las etiquetas Globales y matrices de valores.
De igual modo, la función refresh_graph_data
es creada para ser llamada por la petición AJAX para regresar las nuevas etiquetas actualizadas y matrices de valores como JSON. La función get_chart_page
dejará la página chart.html
cuando sea llamada.
from flask import Flask,jsonify,request
from flask import render_template
import ast
app = Flask(__name__)
labels = []
values = []
@app.route("/")
def get_chart_page():
global labels,values
labels = []
values = []
return render_template('chart.html', values=values, labels=labels)
@app.route('/refreshData')
def refresh_graph_data():
global labels, values
print("labels now: " + str(labels))
print("data now: " + str(values))
return jsonify(sLabel=labels, sData=values)
@app.route('/updateData', methods=['POST'])
def update_data():
global labels, values
if not request.form or 'data' not in request.form:
return "error",400
labels = ast.literal_eval(request.form['label'])
values = ast.literal_eval(request.form['data'])
print("labels received: " + str(labels))
print("data received: " + str(values))
return "success",201
if __name__ == "__main__":
app.run(host='localhost', port=5001)
Ahora vamos a crear una gráfica simple en el archivo chart.html
para poder mostrar la data hashtag y actualizarlos en tiempo real. Como se define abajo, necesitamos importar las bibliotecas JavaScript, Chart.js
y jquery.min.js
.
En el cuerpo de la etiqueta, debemos crear un lienzo y otorgarle una identificación para poder hacer referencia a éste mientras se muestra la gráfica al usar JavaScript en el próximo paso.
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8"/>
<title>Top Trending Twitter Hashtags</title>
<script src='static/Chart.js'></script>
<script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script>
</head>
<body>
<h2>Top Trending Twitter Hashtags</h2>
<div style="width:700px;height=500px">
<canvas id="chart"></canvas>
</div>
</body>
</html>
Ahora vamos a crear la gráfica usando el código JavaScript debajo. Primero, tomamos el elemento del lienzo y luego creamos un nuevo objeto de gráfica y pasamos a éste el elemento del lienzo y definimos el objeto de data como se ve abajo.
Nota que las etiquetas de data están unidas con etiquetas y variables de valores que se regresan mientras dejan la página, al llamar a la función get_chart_page
en el archivo app.py
.
La última parte es la función que se configura para hacer una petición Ajax cada segundo y llamar a la URL /refreshData
, la cual ejecutará refresh_graph_data
en app.py
y regresará la nueva data actualizada y luego actualiza la gráfica que deja la nueva data.
<script>
var ctx = document.getElementById("chart");
var myChart = new Chart(ctx, {
type: 'horizontalBar',
data: {
labels: [{% for item in labels %}
"{{item}}",
{% endfor %}],
datasets: [{
label: '# of Mentions',
data: [{% for item in values %}
{{item}},
{% endfor %}],
backgroundColor: [
'rgba(255, 99, 132, 0.2)',
'rgba(54, 162, 235, 0.2)',
'rgba(255, 206, 86, 0.2)',
'rgba(75, 192, 192, 0.2)',
'rgba(153, 102, 255, 0.2)',
'rgba(255, 159, 64, 0.2)',
'rgba(255, 99, 132, 0.2)',
'rgba(54, 162, 235, 0.2)',
'rgba(255, 206, 86, 0.2)',
'rgba(75, 192, 192, 0.2)',
'rgba(153, 102, 255, 0.2)'
],
borderColor: [
'rgba(255,99,132,1)',
'rgba(54, 162, 235, 1)',
'rgba(255, 206, 86, 1)',
'rgba(75, 192, 192, 1)',
'rgba(153, 102, 255, 1)',
'rgba(255, 159, 64, 1)',
'rgba(255,99,132,1)',
'rgba(54, 162, 235, 1)',
'rgba(255, 206, 86, 1)',
'rgba(75, 192, 192, 1)',
'rgba(153, 102, 255, 1)'
],
borderWidth: 1
}]
},
options: {
scales: {
yAxes: [{
ticks: {
beginAtZero:true
}
}]
}
}
});
var src_Labels = [];
var src_Data = [];
setInterval(function(){
$.getJSON('/refreshData', {
}, function(data) {
src_Labels = data.sLabel;
src_Data = data.sData;
});
myChart.data.labels = src_Labels;
myChart.data.datasets[0].data = src_Data;
myChart.update();
},1000);
</script>
Ejecutar las aplicaciones juntas
Vamos a ejecutar las tres aplicaciones en el orden de abajo: 1. Twitter App Client. 2. Spark App. 3. Dashboard Web App.
Luego puedes acceder al panel de control en tiempo real buscando la URL <http://localhost:5001/>
Ahora puedes ver tu gráfica siendo actualizada abajo:
Usos de la Vida Real de Apache Streaming
Hemos aprendido a hacer análisis de data simple en data en tiempo real usando Spark Streaming e integrándolo directamente con un simple panel de control, usando un servicio web RESTful. De este ejemplo, podemos ver lo poderoso que es Spark, ya que captura una transmisión de data masiva, la transforma y extrae información valiosa que puede ser usada fácilmente para tomar decisiones en poco tiempo. Hay muchos casos de uso útiles, los cuales se pueden implementar y pueden servir a diferentes industrias, como noticias o mercadeo.
Ejemplo de industria de noticias
Podemos rastrear los hashtags mencionados con más frecuencia para saber de qué temas está hablando la gente en las redes sociales. También podemos rastrear hashtags específicos y sus tweets para saber qué dice la gente sobre temas específicos o eventos en el mundo.
Ejemplo de Mercadeo
Podemos recolectar la transmisión de tweets y, al hacer un análisis de opinión, categorizarlos y determinar los intereses de las personas para poder llevarles ofertas relacionadas a sus intereses.
También, hay muchos casos de usos que se pueden aplicar específicamente para los análisis big data y pueden servir a muchas industrias. Para conocer más casos de usos Apache Spark en general, sugiero que revises uno de nuestros posts anteriores.
Recomiendo que leas más sobre Spark Streaming aquí para conocer más sobre sus capacidades y hacer una transformación más avanzada de data para más información en tiempo real al usarlo.
Artículo vía TopTal