airflow自定义mongodb查询页面

之前主数据库都是用的关系型数据库,而现在公司的主流是mongo,也是第一次用,很多也是现学现卖的, 而且各种框架对mongo这种nosql数据库支持也不是很好,比如Django和sqlalchemy就不支持mongo。

由于做用户数据分析,所以需要一个data pipeline平台,而且线上和测试环境的mongo库不能外网连接,所以使用airflow来做ETL,并且可以通过 airflow的Ad Hoc Query页面来查询数据库数据,也方便debug。

虽然Ad Hoc Query中可以选择mongo的连接,但是总是报错。于是我觉得airflow不支持mongo,但上网查也没人提这事的。于是区扒了扒airflow的源码。

我用的airflow的版本是1.10.3,实现Ad Hoc Query的视图函数在www/views.py中2193行的QueryView

...
    hook = db.get_hook()
    try:
        df = hook.get_pandas_df(wwwutils.limit_sql(sql, QUERY_LIMIT, conn_type=db.conn_type))
        # df = hook.get_pandas_df(sql)
        has_data = len(df) > 0
        df = df.fillna('')
        results = df.to_html(
            classes=[
                'table', 'table-bordered', 'table-striped', 'no-wrap'],
            index=False,
            na_rep='',
        ) if has_data else ''
    except Exception as e:
        flash(str(e), 'error')
        error = True
...

然后我查看了mongo的Hook,在contrib/hooks/mongo_hook.py,发现MongoHook没有实现get_pandas_df方法,所以airflow1.10.3一定不支持 mongo。开始我想重写MongoHook在实现get_pandas_df,但又发现db.get_hook()对于mongo已经定了mongo的Hook,要想改只能改源码, 不能在plugins中重写。

class Connection(Base, LoggingMixin):
    ...
    def get_hook(self):
        ...
        elif self.conn_type == 'mongo':
            from airflow.contrib.hooks.mongo_hook import MongoHook
            return MongoHook(conn_id=self.conn_id)
        ...

airflow自带的查询页面不支持mongo,于是我只好根据Ad Hoc Query自己撸一个mongo查询的页面

plugins结构

├── plugins
   ├── __init__.py
   ├── blueprints
   │   ├── __init__.py
   │   ├── mongo_query.py
   │   └── templates
   │       ├── mongodb_query.html
   └── hooks
       └── __init__.py

mongo查询页面及功能实现

plugins/blueprints/templates/mongodb_query.html

基本都是复制的Ad Hoc Query中template的内容

{% extends "airflow/master.html" %}

{% block title %}{{ title }}{% endblock %}

{% block head_css %}
{{ super() }}
<link rel="stylesheet" type="text/css"
    href="{{ url_for("static", filename="main.css") }}">
<link rel="stylesheet" type="text/css"
    href="{{ url_for("static", filename="dataTables.bootstrap.css") }}">
{% endblock %}

{% block body %}
  <h2>Mongodb Query</h2>
  <form method="post" id="query_form">
    <div class="form-inline">
        <input name="_csrf_token" type="hidden" value="{{ csrf_token() }}">
        {{ form.conn_id }}
        {{ form.collection }}
        <input type="submit" class="btn btn-default" value="Run!" id="submit_without_csv">
        <input type="submit" class="btn btn-default" value=".csv" id="submit_with_csv">
        <span id="results"></span><br>
        <div id='ace_container'>
            {{ form.sql(rows=10) }}
        </div>
    </div>
  </form>
  {{ results|safe }}
{% endblock %}
{% block tail %}
  {{ super() }}
  <script src="{{ url_for('static', filename='ace.js') }}"></script>
  <script src="{{ url_for('static', filename='mode-sql.js') }}"></script>
  <script src="{{ url_for('static', filename='jquery.dataTables.min.js') }}"></script>
  <script src="{{ url_for('static', filename='theme-crimson_editor.js') }}"></script>
  <script>
    $( document ).ready(function() {
        var editor = ace.edit("sql");
        var textarea = $('textarea[name="sql"]').hide();
        function sync() {
            textarea.val(editor.getSession().getValue());
        }
        editor.setTheme("ace/theme/crimson_editor");
        editor.setOptions({
            minLines: 3,
            maxLines: Infinity,
        });
        editor.getSession().setMode("ace/mode/sql");
        editor.getSession().on('change', sync);
        editor.focus();
        $('table.dataframe').dataTable({
            "scrollX": true,
            "iDisplayLength": 100,
            "aaSorting": [],
        });
        $('select').addClass("form-control");
        sync();
        $("#submit_without_csv").submit(function(event){
            $("#results").html("<img width='25'src='{{ url_for('static', filename='loading.gif') }}'>");
        });
        $("#submit_with_csv").click(function(){
          $("#csv_value").remove();
          $("#query_form").append('<input name="csv" type="hidden" value="true" id="csv_value">');
        });
        $("#submit_without_csv").click(function(){
          $("#csv_value").remove();
        });
    });
  </script>
{% endblock %}

plugins/blueprints/mongodb_query.py

因为pymongo不能像mysql啥的直接执行sql,只能用api,为了方便页面写类似sql的查询,于是aggregate来统一查询。

from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint, flash
from flask_admin import BaseView, expose
from flask_admin.base import MenuLink
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.executors.base_executor import BaseExecutor
from airflow.www import utils as wwwutils
from airflow.utils.db import create_session, provide_session
from wtforms import (
    Form, SelectField, TextAreaField, PasswordField,
    StringField, validators)
from flask import (
    abort, jsonify, redirect, url_for, request, Markup, Response,
    current_app, render_template, make_response, send_file)
from airflow.models.connection import Connection
import pandas as pd
import json


class MongodbQueryView(BaseView):
    @expose('/', methods=['POST', 'GET'])
    @wwwutils.gzipped
    @provide_session
    def mongodb_query(self, session=None):
        dbs = session.query(Connection).filter(Connection.conn_type == 'mongo').order_by(Connection.conn_id)
        db_choices = list(((db.conn_id, db.conn_id) for db in dbs if db.get_hook()))
        conn_id_str = request.form.get('conn_id')
        csv = request.form.get('csv') == "true"
        sql = request.form.get('sql') or """[{"$match": {}}, {"$sort": {"createAt": -1}}, {"$limit": 100}]"""
        collection = request.form.get('collection') or 'user_event'

        results = ''
        if conn_id_str and request.method == 'POST':
            try:
                db = [db for db in dbs if db.conn_id == conn_id_str][0]
                hook = db.get_hook()
                _data = hook.aggregate(collection, json.loads(sql))
                df = pd.DataFrame(_data)
                try:
                    df['create_time'] = pd.to_datetime(df['createAt'], unit='ms')
                except Exception:
                    pass
                df = df.fillna('')
                results = df.to_html(
                    classes=['table', 'table-bordered', 'table-striped', 'no-wrap'], index=False, na_rep='')
            except Exception as e:
                flash(str(e), 'error')
            else:
                if csv:
                    headers = {"Content-Disposition": "attachment; filename=%s_%s.csv" % (conn_id_str, collection)}
                    return Response(response=df.to_csv(index=False), status=200, headers=headers, mimetype="application/text")

        class QueryForm(Form):
            conn_id = SelectField("Layout", choices=db_choices)
            collection = StringField("Collection")
            sql = TextAreaField("SQL", widget=wwwutils.AceEditorWidget())

        form = QueryForm(request.form, data={'conn_id': conn_id_str, 'sql': sql, 'collection': collection})
        session.commit()
        return self.render("mongodb_query.html", form=form, results=results, title="Mongodb Query")


v = MongodbQueryView(category="My", name="Mongodb Query")

plugins/blueprints/init.py

from .mongo_query import v as mongo_query_v

admin_views = [mongo_query_v,]
menu_links = []

plugins/init.py

from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint
from .blueprints import admin_views, menu_links

bp = Blueprint("my_plugin", __name__, template_folder='blueprints/templates', static_url_path='/static')


class AirflowMyBlueprintPlugin(AirflowPlugin):
    name = "my_plugin"
    flask_blueprints = [bp]
    admin_views = admin_views
    menu_links = menu_links

用户认证

当然数据不能被所有人查看,所以改视图要判断是否登陆

class OnlineUserView(BaseView):
    @expose('/', methods=['GET'])
    @login_required
    @provide_session
    def view(self, session=None):

airflow.login.login_required直接引用是NoneType,需要load_login,但是这个不应该,我看airflow源码启动的时候load了,不知道具体原因。。。

if airflow.login is None:
    airflow.load_login()
login_required = airflow.login.login_required

1.10.3还不能区分区分权限,只能验证登陆跟超级用户