企业搜索引擎开发之连接器connector(十)

这里分析一下FeedConnection接口及其实现类GsaFeedConnection相关源码:

FeedConnection接口源码如下: 

/**
 * Interface for a feed connection. This takes in a data source name and a data
 * source object that contains the data to be sent.  The actual connection to
 * the feed server should be established by the implementation during
 * construction or initialization.
 */
public interface FeedConnection {

  /**
   * Sends data contained in the given data object identified as the given data
   * source name.
   *
   * @param feedData an object that encapsulates the feed data that needs to be
   *        sent by the <code>FeedConnection</code>.
   * @return response from the feed server.
   * @throws FeedException if problem extracting the data or sending it.
   * @throws RepositoryException if problem retrieving data from the Connector.
   */
  public String sendData(FeedData feedData)
      throws FeedException, RepositoryException;

  /**
   * Returns true if the Feed host has large number of unprocessed Feed items.
   * The Feed host may temporarily stop processing Feed items during periodic
   * maintenance, when resetting the index, during system configuration, or
   * due to certain error conditions. If backlogged, the Feed client may choose
   * to throttle back its feeds until the backlog clears.
   *
   * @return true if the Feed host is known to be backlogged processing feeds,
   *         false otherwise.
   */
  public boolean isBacklogged();

  /**
   * Return a String consisting of a comma-separated list supported content
   * encodings.  For instance: "base64binary, base64compressed".
   *
   * @return supported content encodings.
   */
  public String getContentEncodings();

}

方法String sendData(FeedData feedData)向应用中心发送数据;boolean isBacklogged()检查应用中心是否可用;String getContentEncodings()检测应用中心支持的编码类型

实现类GsaFeedConnection源码如下:

/**
 * Opens a connection to a url and sends data to it.
 */
public class GsaFeedConnection implements FeedConnection {

  /**
   * The GSA's response when it successfully receives a feed.
   */
  public static final String SUCCESS_RESPONSE = "Success";

  /**
   * The GSA's response when the client is not authorized to send feeds.
   */
  public static final String UNAUTHORIZED_RESPONSE =
      "Error - Unauthorized Request";

  /**
   * The GSA's response when it runs out of disk space.
   */
  public static final String DISKFULL_RESPONSE =
      "Feed not accepted due to insufficient disk space.";

  /**
   * The GSA's response when there was an internal error.
   */
  public static final String INTERNAL_ERROR_RESPONSE = "Internal Error";

  // Multipart/form-data uploads require a boundary to delimit controls.
  // Since we XML-escape or base64-encode all data provided by the connector,
  // the feed XML will never contain "<<".
  private static final String BOUNDARY = "<<";

  private static final String CRLF = "\r\n";

  // Content encodings supported by GSA.
  private String contentEncodings = null;

  // True if we recently got a feed error of some sort.
  private boolean gotFeedError = false;

  // XmlFeed URL
  private URL feedUrl = null;

  // XmlFeed DTD URL
  private URL dtdUrl = null;

  // BacklogCount URL
  private URL backlogUrl = null;

  // BacklogCount Ceiling. Throttle back feed if backlog exceeds the ceiling.
  private int backlogCeiling = 50000;

  // BacklogCount Floor. Stop throttling feed if backlog drops below floor.
  private int backlogFloor = 15000;

  // True if the feed is throttled back due to excessive backlog.
  private boolean isBacklogged = false;

  // Time of last backlog check.
  private long lastBacklogCheck;

  // How often to check for backlog (in milliseconds).
  private long backlogCheckInterval = 15 * 60 * 1000L;

  private static final Logger LOGGER =
      Logger.getLogger(GsaFeedConnection.class.getName());

  public GsaFeedConnection(String host, int port) throws MalformedURLException {
    this.setFeedHostAndPort(host, port);
  }

  public synchronized void setFeedHostAndPort(String host, int port)
      throws MalformedURLException {
    feedUrl = new URL("http", host, port, "/xmlfeed");
    dtdUrl = new URL("http", host, port, "/getdtd");
    contentEncodings = null;
    backlogUrl = new URL("http", host, port, "/getbacklogcount");
    lastBacklogCheck = 0L;
  }

  /**
   * Set the backlog check parameters. The Feed connection can check to see
   * if the GSA is falling behind processing feeds by calling the GSA's
   * {@code getbacklogcount} servlet. If the number of outstanding feed
   * items exceeds the {@code ceiling}, then the GSA is considered
   * backlogged.  If the number of outstanding feed items then drops below
   * the {@code floor}, it may be considered no longer backlogged.
   *
   * @param floor backlog count floor value, below which the GSA is no
   *        longer considered backlogged.
   * @param ceiling backlog count ceiling value, above which the GSA is
   *        considered backlogged.
   * @param interval number of seconds to wait between backlog count checks.
   */
  public void setBacklogCheck(int floor, int ceiling, int interval) {
    backlogFloor = floor;
    backlogCeiling = ceiling;
    backlogCheckInterval = interval * 1000L;
  }

  public void setContentEncodings(String contentEncodings) {
    this.contentEncodings = contentEncodings;
  }

  private static final void controlHeader(StringBuilder builder,
        String name, String mimetype) {
    builder.append("--").append(BOUNDARY).append(CRLF);
    builder.append("Content-Disposition: form-data;");
    builder.append(" name=\"").append(name).append("\"").append(CRLF);
    builder.append("Content-Type: ").append(mimetype).append(CRLF);
    builder.append(CRLF);
  }

  /* @Override */
  public String sendData(FeedData feedData)
      throws FeedException {
    try {
      String response = sendFeedData((XmlFeed)feedData);
      gotFeedError = !response.equalsIgnoreCase(SUCCESS_RESPONSE);
      return response;
    } catch (FeedException fe) {
      gotFeedError = true;
      throw fe;
    }
  }

  private String sendFeedData(XmlFeed feed)
      throws FeedException {
    String feedType = feed.getFeedType();
    String dataSource = feed.getDataSource();
    OutputStream outputStream;
    HttpURLConnection uc;
    StringBuilder buf = new StringBuilder();
    byte[] prefix;
    byte[] suffix;
    try {
      // Build prefix.
      controlHeader(buf, "datasource", ServletUtil.MIMETYPE_TEXT_PLAIN);
      buf.append(dataSource).append(CRLF);
      controlHeader(buf, "feedtype", ServletUtil.MIMETYPE_TEXT_PLAIN);
      buf.append(feedType).append(CRLF);
      controlHeader(buf, "data", ServletUtil.MIMETYPE_XML);
      prefix = buf.toString().getBytes("UTF-8");

      // Build suffix.
      buf.setLength(0);
      buf.append(CRLF).append("--").append(BOUNDARY).append("--").append(CRLF);
      suffix = buf.toString().getBytes("UTF-8");

      LOGGER.finest("Opening feed connection.");
      synchronized (this) {
        uc = (HttpURLConnection) feedUrl.openConnection();
      }
      uc.setDoInput(true);
      uc.setDoOutput(true);
      uc.setFixedLengthStreamingMode(prefix.length + feed.size()
          + suffix.length);
      uc.setRequestProperty("Content-Type", "multipart/form-data; boundary="
          + BOUNDARY);
      outputStream = uc.getOutputStream();
    } catch (IOException ioe) {
      throw new FeedException(ioe);
    }

    boolean isThrowing = false;
    buf.setLength(0);
    try {
      LOGGER.finest("Writing feed data to feed connection.");
      // If there is an exception during this read/write, we do our
      // best to close the url connection and read the result.
      try {
        outputStream.write(prefix);
        feed.writeTo(outputStream);
        outputStream.write(suffix);
        outputStream.flush();
      } catch (IOException e) {
        LOGGER.log(Level.SEVERE,
            "IOException while posting: will retry later", e);
        isThrowing = true;
        throw new FeedException(e);
      } catch (RuntimeException e) {
        isThrowing = true;
        throw e;
      } catch (Error e) {
        isThrowing = true;
        throw e;
      } finally {
        try {
          outputStream.close();
        } catch (IOException e) {
          LOGGER.log(Level.SEVERE,
              "IOException while closing after post: will retry later", e);
          if (!isThrowing) {
            isThrowing = true;
            throw new FeedException(e);
          }
        }
      }
    } finally {
      BufferedReader br = null;
      try {
        LOGGER.finest("Waiting for response from feed connection.");
        InputStream inputStream = uc.getInputStream();
        br = new BufferedReader(new InputStreamReader(inputStream, "UTF8"));
        String line;
        while ((line = br.readLine()) != null) {
          buf.append(line);
        }
      } catch (IOException ioe) {
        if (!isThrowing) {
          throw new FeedException(ioe);
        }
      } finally {
        try {
          if (br != null) {
            br.close();
          }
        } catch (IOException e) {
          LOGGER.log(Level.SEVERE,
                     "IOException while closing after post: continuing", e);
        }
        if (uc != null) {
          uc.disconnect();
        }
        if (LOGGER.isLoggable(Level.FINEST)) {
          LOGGER.finest("Received response from feed connection: "
                        + buf.toString());
        }
      }
    }
    return buf.toString();
  }

  /* @Override */
  public synchronized String getContentEncodings() {
    if (contentEncodings == null) {
      String dtd = getDtd();
      if (dtd == null) {
        // Failed to get a DTD. Assume the GSA only supports base64 encoded.
        contentEncodings = "base64binary";
      } else {
        // TODO: Extract the supported content encodings from the DTD.
        // As of GSA 6.2, returning a DTD at all also means compression
        // is supported.
        contentEncodings = "base64binary,base64compressed";
      }
      if (LOGGER.isLoggable(Level.FINE)) {
        LOGGER.fine("GSA supports Content Encodings: " + contentEncodings);
      }
    }
    return contentEncodings;
  }

  /* @Override */
  public synchronized boolean isBacklogged() {
    if (lastBacklogCheck != Long.MAX_VALUE) {
      long now = System.currentTimeMillis();
      if ((now - lastBacklogCheck) > backlogCheckInterval) {
        lastBacklogCheck = now;
        // If we got a feed error and the feed is still down, delay.
        if (gotFeedError) {
          if (isFeedAvailable()) {
            gotFeedError = false;
          } else {
            // Feed is still unavailable.
            return true;
          }
        }
        try {
          int backlogCount = getBacklogCount();
          if (backlogCount >= 0) {
            if (isBacklogged) {
              // If we were backlogged, but have dropped below the
              // floor value, then we are no longer backlogged.
              if (backlogCount < backlogFloor) {
                isBacklogged = false;
                LOGGER.info("Resuming traversal after feed backlog clears.");
              }
            } else if (backlogCount > backlogCeiling) {
              // If the backlogcount exceeds the ceiling value,
              // then we are definitely backlogged.
              isBacklogged = true;
              LOGGER.info("Pausing traversal due to excessive feed backlog.");
            }
          }
        } catch (UnsupportedOperationException e) {
          // This older GSA does not support getbacklogcount.
          // Assume never backlogged and don't check again.
          isBacklogged = false;
          lastBacklogCheck = Long.MAX_VALUE;
          LOGGER.fine("Older GSA lacks backlogcount support.");
        }
      }
    }
    return isBacklogged;
  }

  /**
   * @return the current feed backlog count of the GSA,
   *         or -1 if the count is unavailable.
   * @throws UnsupportedOperationException if the GSA does
   *         not support getbacklogcount.
   */
  private int getBacklogCount() {
    try {
      HttpResponse response = doGet(backlogUrl, "backlogcount");
      if (response != null && response.content != null) {
        return Integer.parseInt(response.content);
      }
    } catch (NumberFormatException ignored) {
      // Got a non-integer backlog count - probably an error message,
      // which we have already logged (at Finest).  Simply return -1,
      // indicating that the backlogcount is not currently available.
    }
    // If we get here something bad happened.  It is not the case that the
    // GSA doesn't support getbacklogcount, but we still failed to retrieve it.
    return -1;
  }

  /**
   * Tests for feed error conditions such as insufficient disk space,
   * unauthorized clients, etc.  If the /xmlfeed command is sent with no
   * arguments, the server will return an error message and a 200 response
   * code if it can't accept feeds.  If it can continue to accept feeds, then
   * it will return a 400 bad request since it's missing required parameters.
   *
   * @return True if feed host is likely to accept a feed request.
   */
  private boolean isFeedAvailable() {
    try {
      HttpResponse response = doGet(feedUrl, "XmlFeed");
      if (response != null) {
        if (response.responseCode == HttpURLConnection.HTTP_BAD_REQUEST) {
          // The expected responseCode if no error conditions are present.
          LOGGER.finest("XmlFeed connection seems to be accepting new feeds.");
          return true;
        }
        if (response.content != null) {
          response.content.contains(SUCCESS_RESPONSE);
        }
      }
    } catch (UnsupportedOperationException ignored) {
      // This GSA does not support feeds?  Return false.
    }
    // If we get here something bad happened.
    return false;
  }

  /**
   * @return the current feed XML DTD for the GSA,
   *         or null if the DTD is unavailable.
   */
  private String getDtd() {
    try {
      HttpResponse response = doGet(dtdUrl, "DTD");
      if (response != null && response.content != null) {
        return response.content;
      }
    } catch (UnsupportedOperationException ignored) {
      // This older GSA does not support getdtd, so return null.
      LOGGER.fine("Older GSA lacks get DTD support.");
    }
    return null;
  }

  /**
   * Get the response to a URL request.  The response is returned
   * as an HttpResponse containing the HTTP ResponseCode and the
   * returned content as a String. The content String is only returned
   * if the response code was OK.
   *
   * @param url the URL to request
   * @param name the name of the feature requested (for logging)
   * @return HttpResponse representing response to an HTTP GET.
   *         or null if the GSA is unavailable.
   * @throws UnsupportedOperationException if the GSA does
   *         not support the requested feature.
   */
  private HttpResponse doGet(URL url, String name) {
    HttpURLConnection conn = null;
    BufferedReader br = null;
    String str = null;
    StringBuilder buf = new StringBuilder();
    try {
      if (LOGGER.isLoggable(Level.FINEST)) {
        LOGGER.finest("Opening " + name + " connection.");
      }
      conn = (HttpURLConnection)url.openConnection();
      conn.connect();
      int responseCode = conn.getResponseCode();
      if (responseCode == HttpURLConnection.HTTP_OK) {
        br = new BufferedReader(new InputStreamReader(conn.getInputStream(),
                                                      "UTF8"));
        while ((str = br.readLine()) != null) {
          buf.append(str);
        }
        str = buf.toString().trim();
        if (LOGGER.isLoggable(Level.FINEST)) {
          LOGGER.finest("Received " + name + ": " + str);
        }
        return new HttpResponse(responseCode, str);
      } else if (responseCode == HttpURLConnection.HTTP_NOT_FOUND) {
        throw new UnsupportedOperationException(
            "GSA lacks " + name + " support.");
      } else {
        return new HttpResponse(responseCode);
      }
    } catch (IOException ioe) {
      LOGGER.finest("Error while reading " + name + ": " + ioe.getMessage());
    } finally {
      try {
        if (br != null) {
          br.close();
        }
      } catch (IOException e) {
        LOGGER.finest("Error after reading " + name + ": " + e.getMessage());
      }
      if (conn != null) {
        conn.disconnect();
      }
    }
    // If we get here something bad happened. It is not the case that the GSA
    // doesn't support the requested feature, but we failed to retrieve it.
    return null;
  }

  private static class HttpResponse {
    public int responseCode;  // The HTTP response code.
    public String content;    // The returned content as a String.

    public HttpResponse(int responseCode) {
      this(responseCode, null);
    }

    public HttpResponse(int responseCode, String content) {
      this.responseCode = responseCode;
      this.content = content;
    }
  }
}

在String sendFeedData(XmlFeed feed)方法中是通过HttpURLConnection类来发送数据的,向HttpURLConnection的OutputStream输出流写入xmlfeed数据

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接http://www.cnblogs.com/chenying99/archive/2013/03/19/2968427.html

原文地址:https://www.cnblogs.com/chenying99/p/2968427.html