十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
[[198035]]

1. 概述
可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。
本文主要分成四部分:
2. 主流程
MyCAT Server 接收 MySQL Client 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDB Server。
MyCAT Server 接收 MongoDB Server 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQL Client。
这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。
Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。
MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。
是不是熟悉的味道。不得不说 JDBC 规范的精妙。
3. 查询操作
- SELECT id, name FROM user WHERE name > '' ORDER BY _id DESC;
 
看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。
1、查询 MongoDB
- // MongoSQLParser.java
 - public MongoData query() throws MongoSQLException {
 - if (!(statement instanceof SQLSelectStatement)) {
 - //return null;
 - throw new IllegalArgumentException("not a query sql statement");
 - }
 - MongoData mongo = new MongoData();
 - DBCursor c = null;
 - SQLSelectStatement selectStmt = (SQLSelectStatement) statement;
 - SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();
 - int icount = 0;
 - if (sqlSelectQuery instanceof MySqlSelectQueryBlock) {
 - MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();
 - BasicDBObject fields = new BasicDBObject();
 - // 显示(返回)的字段
 - for (SQLSelectItem item : mysqlSelectQuery.getSelectList()) {
 - //System.out.println(item.toString());
 - if (!(item.getExpr() instanceof SQLAllColumnExpr)) {
 - if (item.getExpr() instanceof SQLAggregateExpr) {
 - SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();
 - if (expr.getMethodName().equals("COUNT")) { // TODO 待读:count(*)
 - icount = 1;
 - mongo.setField(getExprFieldName(expr), Types.BIGINT);
 - }
 - fields.put(getExprFieldName(expr), 1);
 - } else {
 - fields.put(getFieldName(item), 1);
 - }
 - }
 - }
 - // 表名
 - SQLTableSource table = mysqlSelectQuery.getFrom();
 - DBCollection coll = this._db.getCollection(table.toString());
 - mongo.setTable(table.toString());
 - // WHERE
 - SQLExpr expr = mysqlSelectQuery.getWhere();
 - DBObject query = parserWhere(expr);
 - // GROUP BY
 - SQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();
 - BasicDBObject gbkey = new BasicDBObject();
 - if (groupby != null) {
 - for (SQLExpr gbexpr : groupby.getItems()) {
 - if (gbexpr instanceof SQLIdentifierExpr) {
 - String name = ((SQLIdentifierExpr) gbexpr).getName();
 - gbkey.put(name, Integer.valueOf(1));
 - }
 - }
 - icount = 2;
 - }
 - // SKIP / LIMIT
 - int limitoff = 0;
 - int limitnum = 0;
 - if (mysqlSelectQuery.getLimit() != null) {
 - limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());
 - limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());
 - }
 - if (icount == 1) { // COUNT(*)
 - mongo.setCount(coll.count(query));
 - } else if (icount == 2) { // MapReduce
 - BasicDBObject initial = new BasicDBObject();
 - initial.put("num", 0);
 - String reduce = "function (obj, prev) { " + " prev.num++}";
 - mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));
 - } else {
 - if ((limitoff > 0) || (limitnum > 0)) {
 - c = coll.find(query, fields).skip(limitoff).limit(limitnum);
 - } else {
 - c = coll.find(query, fields);
 - }
 - // order by
 - SQLOrderBy orderby = mysqlSelectQuery.getOrderBy();
 - if (orderby != null) {
 - BasicDBObject order = new BasicDBObject();
 - for (int i = 0; i < orderby.getItems().size(); i++) {
 - SQLSelectOrderByItem orderitem = orderby.getItems().get(i);
 - order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));
 - }
 - c.sort(order);
 - // System.out.println(order);
 - }
 - }
 - mongo.setCursor(c);
 - }
 - return mongo;
 - }
 
2、查询条件
- // MongoSQLParser.java
 - private void parserWhere(SQLExpr aexpr, BasicDBObject o) {
 - if (aexpr instanceof SQLBinaryOpExpr) {
 - SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr;
 - SQLExpr exprL = expr.getLeft();
 - if (!(exprL instanceof SQLBinaryOpExpr)) {
 - if (expr.getOperator().getName().equals("=")) {
 - o.put(exprL.toString(), getExpValue(expr.getRight()));
 - } else {
 - String op = "";
 - if (expr.getOperator().getName().equals("<")) {
 - op = "$lt";
 - } else if (expr.getOperator().getName().equals("<=")) {
 - op = "$lte";
 - } else if (expr.getOperator().getName().equals(">")) {
 - op = "$gt";
 - } else if (expr.getOperator().getName().equals(">=")) {
 - op = "$gte";
 - } else if (expr.getOperator().getName().equals("!=")) {
 - op = "$ne";
 - } else if (expr.getOperator().getName().equals("<>")) {
 - op = "$ne";
 - }
 - parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));
 - }
 - } else {
 - if (expr.getOperator().getName().equals("AND")) {
 - parserWhere(exprL, o);
 - parserWhere(expr.getRight(), o);
 - } else if (expr.getOperator().getName().equals("OR")) {
 - orWhere(exprL, expr.getRight(), o);
 - } else {
 - throw new RuntimeException("Can't identify the operation of of where");
 - }
 - }
 - }
 - }
 - private void orWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob) {
 - BasicDBObject xo = new BasicDBObject();
 - BasicDBObject yo = new BasicDBObject();
 - parserWhere(exprL, xo);
 - parserWhere(exprR, yo);
 - ob.put("$or", new Object[]{xo, yo});
 - }
 
3、解析 MongoDB 数据
- // MongoResultSet.java
 - public MongoResultSet(MongoData mongo, String schema) throws SQLException {
 - this._cursor = mongo.getCursor();
 - this._schema = schema;
 - this._table = mongo.getTable();
 - this.isSum = mongo.getCount() > 0;
 - this._sum = mongo.getCount();
 - this.isGroupBy = mongo.getType();
 - if (this.isGroupBy) {
 - dblist = mongo.getGrouyBys();
 - this.isSum = true;
 - }
 - if (this._cursor != null) {
 - select = _cursor.getKeysWanted().keySet().toArray(new String[0]);
 - // 解析 fields
 - if (this._cursor.hasNext()) {
 - _cur = _cursor.next();
 - if (_cur != null) {
 - if (select.length == 0) {
 - SetFields(_cur.keySet());
 - }
 - _row = 1;
 - }
 - }
 - // 设置 fields 类型
 - if (select.length == 0) {
 - select = new String[]{"_id"};
 - SetFieldType(true);
 - } else {
 - SetFieldType(false);
 - }
 - } else {
 - SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};
 - SetFieldType(mongo.getFields());
 - }
 - }
 
4、返回数据给 MySQL Client
- // JDBCConnection.java
 - private void ouputResultSet(ServerConnection sc, String sql)
 - throws SQLException {
 - ResultSet rs = null;
 - Statement stmt = null;
 - try {
 - stmt = con.createStatement();
 - rs = stmt.executeQuery(sql);
 - // header
 - List
 fieldPks = new LinkedList<>(); - ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark);
 - int colunmCount = fieldPks.size();
 - ByteBuffer byteBuf = sc.allocate();
 - ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();
 - headerPkg.fieldCount = fieldPks.size();
 - headerPkg.packetId = ++packetId;
 - byteBuf = headerPkg.write(byteBuf, sc, true);
 - byteBuf.flip();
 - byte[] header = new byte[byteBuf.limit()];
 - byteBuf.get(header);
 - byteBuf.clear();
 - List
 fields = new ArrayList (fieldPks.size()); - for (FieldPacket curField : fieldPks) {
 - curField.packetId = ++packetId;
 - byteBuf = curField.write(byteBuf, sc, false);
 - byteBuf.flip();
 - byte[] field = new byte[byteBuf.limit()];
 - byteBuf.get(field);
 - byteBuf.clear();
 - fields.add(field);
 - }
 - // header eof
 - EOFPacket eofPckg = new EOFPacket();
 - eofPckg.packetId = ++packetId;
 - byteBuf = eofPckg.write(byteBuf, sc, false);
 - byteBuf.flip();
 - byte[] eof = new byte[byteBuf.limit()];
 - byteBuf.get(eof);
 - byteBuf.clear();
 - this.respHandler.fieldEofResponse(header, fields, eof, this);
 - // row
 - while (rs.next()) {
 - RowDataPacket curRow = new RowDataPacket(colunmCount);
 - for (int i = 0; i < colunmCount; i++) {
 - int j = i + 1;
 - if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {
 - curRow.add(rs.getBytes(j));
 - } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||
 - fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte
 - // ensure that do not use scientific notation format
 - BigDecimal val = rs.getBigDecimal(j);
 - curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset()));
 - } else {
 - curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));
 - }
 - }
 - curRow.packetId = ++packetId;
 - byteBuf = curRow.write(byteBuf, sc, false);
 - byteBuf.flip();
 - byte[] row = new byte[byteBuf.limit()];
 - byteBuf.get(row);
 - byteBuf.clear();
 - this.respHandler.rowResponse(row, this);
 - }
 - fieldPks.clear();
 - // row eof
 - eofPckg = new EOFPacket();
 - eofPckg.packetId = ++packetId;
 - byteBuf = eofPckg.write(byteBuf, sc, false);
 - byteBuf.flip();
 - eof = new byte[byteBuf.limit()];
 - byteBuf.get(eof);
 - sc.recycle(byteBuf);
 - this.respHandler.rowEofResponse(eof, this);
 - } finally {
 - if (rs != null) {
 - try {
 - rs.close();
 - } catch (SQLException e) {
 - }
 - }
 - if (stmt != null) {
 - try {
 - stmt.close();
 - } catch (SQLException e) {
 - }
 - }
 - }
 - }
 - // MongoResultSet.java
 - @Override
 - public String getString(String columnLabel) throws SQLException {
 - Object x = getObject(columnLabel);
 - if (x == null) {
 - return null;
 - }
 - return x.toString();
 - }
 
当返回字段值是 Object 时,返回该对象.toString()。例如:
- mysql> select * from user order by _id asc;
 - +--------------------------+------+-------------------------------+
 - | _id | name | profile |
 - +--------------------------+------+-------------------------------+
 - | 1 | 123 | { "age" : 1 , "height" : 100} |
 
4. 插入操作
- // MongoSQLParser.java
 - public int executeUpdate() throws MongoSQLException {
 - if (statement instanceof SQLInsertStatement) {
 - return InsertData((SQLInsertStatement) statement);
 - }
 - if (statement instanceof SQLUpdateStatement) {
 - return UpData((SQLUpdateStatement) statement);
 - }
 - if (statement instanceof SQLDropTableStatement) {
 - return dropTable((SQLDropTableStatement) statement);
 - }
 - if (statement instanceof SQLDeleteStatement) {
 - return DeleteDate((SQLDeleteStatement) statement);
 - }
 - if (statement instanceof SQLCreateTableStatement) {
 - return 1;
 - }
 - return 1;
 - }
 - private int InsertData(SQLInsertStatement state) {
 - if (state.getValues().getValues().size() == 0) {
 - throw new RuntimeException("number of columns error");
 - }
 - if (state.getValues().getValues().size() != state.getColumns().size()) {
 - throw new RuntimeException("number of values and columns have to match");
 - }
 - SQLTableSource table = state.getTableSource();
 - BasicDBObject o = new BasicDBObject();
 - int i = 0;
 - for (SQLExpr col : state.getColumns()) {
 - o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));
 - i++;
 - }
 - DBCollection coll = this._db.getCollection(table.toString());
 - coll.insert(o);
 - return 1;
 - }