说明

UV, 一个用 Rust 编写的极快的 Python 包和项目管理器。

安装

独立安装

uv 提供了一个独立的安装程序来下载和安装 uv:
Widnow使用irm以下方式下载脚本并执行iex

1
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"

通过在 URL 中包含特定版本来请求它

1
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/0.6.14/install.ps1 | iex"

WinGet

1
winget install --id=astral-sh.uv  -e

Scoop

1
scoop install main/uv

安装完成后使用以下命令来检查uv是否可用:

1
uv

使用

uv 提供了 Python 开发的基本功能——从安装 Python 和支持简单脚本到处理支持多个 Python 版本和平台的大型项目。目前uv 主要分为下面几个功能,下面简单介绍一下使用。

  • Python版本的管理
  • 脚本执行
  • 项目管理

项目管理

uv 支持管理 Python 项目,在文件中定义其依赖关系pyproject.toml
主要包含下面命令:

  • uv init:创建一个新的 Python 项目。
  • uv add:向项目添加依赖项。
  • uv remove:从项目中删除依赖项。
  • uv sync:将项目的依赖项与环境同步。
  • uv lock:为项目的依赖项创建一个锁文件。
  • uv run:在项目环境中运行命令。
  • uv tree:查看项目的依赖关系树。
  • uv build:将项目构建到分发档案中。
  • uv publish:将项目发布到包索引。

创建新项目

可以使用以下命令创建一个新的 Python 项目uv init

1
2
uv init hello-world
cd hello-world

或者,您可以在工作目录中初始化一个项目:

1
2
3
mkdir hello-world
cd hello-world
uv init

uv 将创建以下文件:

1
2
3
4
5
.
├── .python-version
├── README.md
├── main.py
└── pyproject.toml

项目结构

一个项目由几个重要的部分组成,它们协同工作,并允许 uv 管理你的项目。除了 由 创建的文件之外, 当你第一次运行项目命令(即、 或 )时, uv inituv 还会在项目的根目录中创建一个虚拟环境和文件。完整列表如下:

1
2
3
4
5
6
7
8
9
10
.
├── .venv //虚拟环境
│   ├── bin
│   ├── lib
│   └── pyvenv.cfg
├── .python-version //包含项目的默认 Python 版本。此文件告诉 uv 在创建项目的虚拟环境时使用哪个 Python 版本。
├── README.md
├── main.py
├── pyproject.toml // 有关您的项目的元数据
└── uv.lock

其中uv.lock是一个跨平台的锁文件,其中包含项目依赖项的精确信息。与pyproject.toml用于指定项目总体需求的 不同,锁文件包含项目环境中已安装的精确解析版本。==此文件应纳入版本控制,以便在不同机器上实现一致且可重复的安装==。uv.lock是人类可读的 TOML 文件,但由 uv 管理,不应手动编辑。

管理依赖项

可以使用命令uv add添加依赖项,这也会更新锁pyproject.toml文件和项目环境```

1
2
3
4
5
uv add requests
# 指定特定的版本
uv add 'requests==2.31.0'
# 指定特定的源
uv add git+https://github.com/psf/requests

如果要删除依赖,可以使用uv remove 命令。

1
uv remove requests

要升级软件包,请uv lock使用以下--upgrade-package标志运行:

1
uv lock --upgrade-package requests

--upgrade-package标志将尝试将指定的包更新到最新的兼容版本,同时保持锁文件的其余部分完好无损。

运行命令

uv run可用于在您的项目环境中运行任意脚本或命令。

在每次调用之前uv run,uv 都会验证锁文件是否是最新的 pyproject.toml,并且环境是否是最新的锁文件,从而使您的项目保持同步,而无需手动干预。uv run保证您的命令在一致的、锁定的环境中运行。

例如,使用flask

1
2
uv add flask
uv run -- flask run -p 3000

也可以使用uv sync手动更新环境,然后在执行命令之前激活它:

1
2
3
uv sync
source .venv\Scripts\activate
flask run -p 3000

构建发行版

uv build可用于为您的项目构建源分布和二进制分布(轮子)。
默认情况下,uv build将在当前目录中构建项目,并将构建的工件放在dist/子目录中:

1
2
uv build
ls dist/

Python管理

如果您的系统上已安装 Python,uv 将 检测并使用它,无需任何配置。但是,uv 还可以安装和管理 Python 版本。uv 会根据需要自动安装缺少的 Python 版本——您无需安装 Python 即可开始使用。

安装最新的版本

1
uv python install

安装特定的版本

1
uv python install 3.12

重新安装

1
uv python install --reinstall

查看已安装的Python版本

1
uv python list

运行脚本

Python 脚本是用于独立执行的文件,例如python <script>.py。使用 uv 执行脚本可确保管理脚本依赖关系,而无需手动管理环境。

什么是Scoop?

Scoop 是 Windows 的命令行安装程序,是一个强大的包管理工具。可以在 github 上找到其项目的相关信息,项目地址, Scoop 等一系列包管理器的诞生,第一大便利就是省去了上述繁琐的「搜索 - 下载 - 安装」的步骤,让我们能够通过「一行代码」急速安装。

同时,用 Scoop 来安装和管理我们的软件:

  • 集搜索、下载、安装、更新软件于一体:极大的降低了安装维护一个软件的成本,我们甚至不必在软件本身的复杂菜单中寻找那个更新按钮来更新软件自己
  • 将软件干干净净的安装到电脑的「用户文件夹」下:这样既不会污染路径也不会请求不必要的权限(UAC)
  • 在卸载软件的时候,能够尽量清空软件在电脑上存储的任何数据和痕迹

Scoop 最适合安装那种干净、小巧、开源的软件。并且,Scoop 也极度适合为开发者配置开发环境,不过这些很多都涉及到进阶使用技巧。下面先从基础的安装方法开始介绍。

安装

要求:

  • PowerShell >= 5.0 (如果是 Window10 则默认满足此条件)
  • 请确保已允许PowerShell执行本地脚本,可以使用下面的命令开启:
    1
    2
    set-executionpolicy remotesigned -scope currentuser

从 Power Shell 终端运行以下命令来安装 Scoop:

1
2
Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser
Invoke-RestMethod -Uri https://get.scoop.sh | Invoke-Expression

它会将 Scoop 安装到其默认位置:C:\Users\<YOUR USERNAME>\scoop 。 全局安装的程序(所有用户可用,使用--global或 -g 选项)位 于C\ProgramData\scoop路径中。

如果想自定义安装位置, 可以在==安装前==设置 :
用户安装目录:

1
2
$env:SCOOP='D:\Applications\Scoop'
[Environment]::SetEnvironmentVariable('SCOOP', $env:SCOOP, 'User')

全局安装目录:

1
2
$env:SCOOP_GLOBAL='F:\GlobalScoopApps'
[Environment]::SetEnvironmentVariable('SCOOP_GLOBAL', $env:SCOOP_GLOBAL, 'Machine')

常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scoop install  git
scoop install sudo # 申请管理员权限,和linux下sudo命令相似,全局安装必备
sudo scoop install -g python # 其实大部分我都推荐全局安装

scoop update latex # 更新
scoop update -g hugo # 更新全局安装的软件
scoop update * # 更新所有
scoop uninstall curl # 卸载
scoop uninstall -g gcc # 卸载全局安装的软件


scoop search Source-Han-Mono # 搜索
scoop info miniconda3 # 软件信息
scoop home pwsh # 打开软件主页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
scoop help
Usage: scoop <command> [<args>]

Available commands are listed below.

Type 'scoop help <command>' to get more help for a specific command.

Command Summary
------- -------
alias Manage scoop aliases
bucket Manage Scoop buckets
cache Show or clear the download cache
cat Show content of specified manifest.
checkup Check for potential problems
cleanup Cleanup apps by removing old versions
config Get or set configuration values
create Create a custom app manifest
depends List dependencies for an app, in the order they'll be installed
download Download apps in the cache folder and verify hashes
export Exports installed apps, buckets (and optionally configs) in JSON format
help Show help for a command
hold Hold an app to disable updates
home Opens the app homepage
import Imports apps, buckets and configs from a Scoopfile in JSON format
info Display information about an app
install Install apps
list List installed apps
prefix Returns the path to the specified app
reset Reset an app to resolve conflicts
search Search available apps
shim Manipulate Scoop shims
status Show status and check for new app versions
unhold Unhold an app to enable updates
uninstall Uninstall an app
update Update apps, or Scoop itself
virustotal Look for app's hash or url on virustotal.com
which Locate a shim/executable (similar to 'which' on Linux)

安卓开发—天气


技术选型

  1. ViewPager 用来展示切换多个城市天气情况
  2. SwipeRefreshLayout 下拉刷新天气
  3. ScrollView 滑动展示天气详情
  4. 参考第一行代码中CoolWeather

页面布局

天气页面的粗略布局如下:
主要为SwipeRefreshLayout嵌套ViewPager

<FrameLayout>
    //展示背景图片
    <ImageView/>
    <LinearLayout>
        <LinearLayout/>
        //下拉刷新
        <SwipeRefreshLayout>	
            //展示天气页面
            <ViewPager/>
        </SwipeRefreshLayout>
    </LinearLayout>
</FrameLayout>

下面为viewpager的布局,ScrollView用来滑动展示单个城市的天气详情,LinearLayout里展示天气详情

<FrameLayout>
    //纵向滑动显示天气详情
    <ScrollView>
        <LinearLayout>
            <include layout="@layout/now" />
            <include layout="@layout/forecast" />
            <include layout="@layout/lifestyle"/>
        </LinearLayout>
    </ScrollView>
</FrameLayout>

遇到的问题

ViewPager和SwipeRefreshLayout发生滑动冲突

  • 原因: SwipeRefreshLayout的onIntercept方法发现是在这里拦截了滑动事件,无法传递到viewpager里,假如无法像直线一样横向滑动出现了纵向偏差就会触发SwipeRefreshLayout

  • 解决思路:下拉刷新只在纵向滑动触发,那么在SwipeRefreshLayout里只拦截纵向滑动

      import android.content.Context;
      import android.util.AttributeSet;
      import android.view.MotionEvent;
      import android.view.ViewConfiguration;
      
      import androidx.annotation.NonNull;
      import androidx.swiperefreshlayout.widget.SwipeRefreshLayout;
      
      public class PocketSwipeRefreshLayout extends SwipeRefreshLayout {
          private float mStartX = 0;
          private float mStartY = 0;
      
          //记录Viewpager是否被拖拉
          private boolean mIsVpDrag;
          private final int mTouchSlop;
          public PocketSwipeRefreshLayout(@NonNull Context context) {
              super(context);
              mTouchSlop = ViewConfiguration.get(context).getScaledTouchSlop();
          }
      
          public PocketSwipeRefreshLayout(@NonNull Context context, AttributeSet attrs) {
              super(context,attrs);
              mTouchSlop = ViewConfiguration.get(context).getScaledTouchSlop();
          }
      
          @Override
          public boolean onInterceptTouchEvent(MotionEvent ev) {
              int action = ev.getAction();
              switch (action){
                  case MotionEvent.ACTION_DOWN:
                      mStartX = ev.getX();
                      mStartY = ev.getY();
                      mIsVpDrag = false;
                      break;
                  case MotionEvent.ACTION_MOVE:
                      //如果viewpager正在拖拽,则不拦截viewpager事件
                      if (mIsVpDrag){
                          return false;
                      }
                      float x = ev.getX();
                      float y = ev.getY();
                      float distanceX = Math.abs(x - mStartX);
                      float distanceY = Math.abs(y - mStartY);
                      //如果滑动x位移大于y,不拦截viewpager事件
                      if (distanceX > mTouchSlop && distanceX>distanceY){
                          mIsVpDrag = true;
                          return false;
                      }
                      break;
                  case MotionEvent.ACTION_UP:
                  case MotionEvent.ACTION_CANCEL:
                      mIsVpDrag = true;
                      break;
                  default:
                      break;
              }
              return super.onInterceptTouchEvent(ev);
      
          }
      }
    

SwipeRefreshLayout与ScrollView发生下滑冲突

  • 情况说明:在未采用ViewPager时,即只展示一个城市天气时,SwipeRefreshLayout与ScrollView并不会发生下滑冲突,但在采用ViewPager后会发生冲突,导致浏览到页面下部后却触发SwipeRefreshLayout无法向上滑动页面

  • 原因:SwipeRefreshLayout拦截了纵向滑动事件,但为什么未采用viewpager是不会发生冲突还未明白

  • 解决思路:当且仅当滑动到顶部时才需要触发下拉刷新,那么在纵坐标未在顶部时屏蔽SwipeRefreshLayout

      // 初始化各控件
      ScrollView weatherLayout = (ScrollView) view.findViewById(R.id.weather_layout);
      //解决SwipeRefreshLayout与ScrollView的下拉冲突
      weatherLayout.setOnScrollChangeListener(new View.OnScrollChangeListener() {
          @Override
          public void onScrollChange(View v, int scrollX, int scrollY, int oldScrollX, int oldScrollY) {
              if (swipeRefreshLayout != null){
                  swipeRefreshLayout.setEnabled(scrollY==0);
              }
          }
      });
    

ViewPager动态更新页面

  • 情况说明:在下拉刷新时,需要去重新得到城市的天气情况并展示在UI中。

  • 解决思路:PagerAdapter可以通过调用notifyDataSetChanged()方法实现数据集的刷新。
    viewpager的刷新过程是这样的:在每次调用notifyDataSetChanged()时,都会激活getItemPosition(Object object)方法,该方法会遍历viewpager的所有item,为每个item返回一个状态值(POSITION_NONE/POSITION_UNCHANGED),如果是none,那么该item会被destroyItem(ViewGroup container, int position, Object object)方法remove掉,然后重新加载,如果是unchanged,就不会重新加载,默认是unchanged,所以如果我们不重写getItemPosition(Object object),就无法看到刷新效果。

    那么为了只刷新单个页面,我们重写getItemPosition方法,并且为了确定需要刷新哪个页面,给每个view添加一个tag。

      viewPager = (ViewPager) findViewById(R.id.weather_pager);
      LayoutInflater layoutInflater = getLayoutInflater();
      for (int i=0;i<choosedCountyList.size(); i++){
          View view = layoutInflater.inflate(R.layout.weather_layout, null);
          view.setTag(i);
          String countyName = choosedCountyList.get(i);
          fillView(view, countyName,true);
          pageList.add(view);
      }
      adapter = new PagerAdapter() {
          @Override
          public int getCount() {
              return pageList.size();
          }
    
          @Override
          public int getItemPosition(@NonNull Object object) {
              //更新其中一个页面
              View view = (View) object;
              int currentPagerIdx = viewPager.getCurrentItem();
              if (currentPagerIdx == (Integer) view.getTag()) {
                  return POSITION_NONE;
              } else {
                  return POSITION_UNCHANGED;
              }
        
          }
    
          @Override
          public boolean isViewFromObject(@NonNull View view, @NonNull Object object) {
              return view==object;
          }
    
          @Override
          public void destroyItem(@NonNull ViewGroup container, int position, @NonNull Object object) {
              container.removeView(pageList.get(position));
          }
    
          @NonNull
          @Override
          public Object instantiateItem(@NonNull ViewGroup container, int position) {
              container.addView(pageList.get(position));
              return pageList.get(position);
          }
    
    
      };
    

ViewPager页添加或减少城市

  • 情况说明:
    • 当出现删除城市时,即删除viewPager中的view,如果直接pageList中remove页面后使用adapter.notifyDataSetChanged()来通通知视图改变,这会出现超出索引的问题。
    • 当出现增加城市,一般不会出现问题。
    • 当出现城市数量不变但是城市发生了改变,会出现显示城市错误的情况,仍然显示原来的城市天气情况。
  • 原因:
    • destroyItem方法中采用了位置索引的方式来删除页面,但是因为pageList中remove了页面,导致索引超出了。
    • 无问题
    • 因为viewPager会缓存当前页面的左右页面(3个页面),这会导致仍然显示原来的城市
  • 解决思路:
    • destroyItem方法将object转为view后删除的方式来解决索引问题。
    • 清空pageList,重新添加城市,此时需要注意因为上面选择了只更新当前页面,但现在需要更新所有页面,设置标志位来辨别更新一个还是更新所有。

主要的代码变动如下:

//改变适配器
adapter = new PagerAdapter() {
    @Override
    public int getCount() {
        return pageList.size();
    }

    @Override
    public int getItemPosition(@NonNull Object object) {
        if (updateStatu == UPDATE_ONE_PAGE){
            //更新其中一个页面
            View view = (View) object;
            int currentPagerIdx = viewPager.getCurrentItem();
            if (currentPagerIdx == (Integer) view.getTag()) {
                return POSITION_NONE;
            } else {
                return POSITION_UNCHANGED;
            }
        } else if (updateStatu == UPDATE_ALL_PAGE){
            //更新所有页面,目的为去除缓存
            return POSITION_NONE;
       }
        return POSITION_NONE;
    }
}


//在城市列表方式变化是重新生成页面
//当前变化
if (!equalList(curList, choosedCountyList)){
    LayoutInflater layoutInflater = getLayoutInflater();
    pageList.clear();
    for (int i=0; i< curList.size();i++){
        //以前的没有,新增的城市,新增页面
        View view = layoutInflater.inflate(R.layout.weather_layout, null);
        view.setTag(i);
        String countyName = curList.get(i);
        fillView(view, countyName,false);
        pageList.add(i, view);
    }
    updateStatu = UPDATE_ALL_PAGE;
    adapter.notifyDataSetChanged();
}
//一定要在选择页面前更新城市列表,不然增加城市时onPageSelected出现超过索引的问题
choosedCountyList = curList;
viewPager.setCurrentItem(position);

/**
 * 更新当前页天气
 */
private void updatePageView(){
    updateStatu = UPDATE_ONE_PAGE;
    View view = pageList.get(position);
    String countyName = choosedCountyList.get(position);
    fillView(view, countyName, true);
    adapter.notifyDataSetChanged();
}

Weather展示

  • 情况:默认情况每次都会生成新的活动,但是天气展示页面只需要一个就够

  • 解决思路: 在manifest文件中设置WeatherActivity为singleTask

      <activity android:name=".WeatherActivity" android:launchMode="singleTask"/>
    

从城市添加页面跳转到Weather页面展示当前选中的城市

  • 情况:从从城市添加页面跳转到Weather页面时默认显示之前Weather页面显示的城市情况
  • 解决思路:在prefs = PreferenceManager.getDefaultSharedPreferences (this)中保存当前选中的城市,随后在Weather活动中取出,根据城市名得到所在页面的索引,设置为当前页。viewPager.setCurrentItem(position);

代码展示

<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
    package="com.coolweather.android">

    <!-- 这个权限用于进行网络定位 -->
    <uses-permission android:name="android.permission.ACCESS_COARSE_LOCATION" /> <!-- 这个权限用于访问GPS定位 -->
    <uses-permission android:name="android.permission.ACCESS_FINE_LOCATION" /> <!-- 用于访问wifi网络信息,wifi信息会用于进行网络定位 -->
    <uses-permission android:name="android.permission.ACCESS_WIFI_STATE" /> <!-- 获取运营商信息,用于支持提供运营商信息相关的接口 -->
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> <!-- 这个权限用于获取wifi的获取权限,wifi信息会用来进行网络定位 -->
    <uses-permission android:name="android.permission.CHANGE_WIFI_STATE" /> <!-- 写入扩展存储,向扩展卡写入数据,用于写入离线定位数据 -->
    <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /> <!-- 访问网络,网络定位需要上网 -->
    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.READ_PHONE_STATE" />

    <application
        android:name="org.litepal.LitePalApplication"
        android:allowBackup="true"
        android:icon="@mipmap/ic_launcher"
        android:label="@string/app_name"
        android:roundIcon="@mipmap/ic_launcher_round"
        android:supportsRtl="true"
        android:theme="@style/AppTheme">
        <service
            android:name=".AutoUpdateService"
            android:enabled="true"
            android:exported="true"></service>

        <meta-data
            android:name="com.baidu.lbsapi.API_KEY"
            android:value="xxxxxx" />

        <activity android:name=".ChooseAreaActivity" />
        <activity android:name=".CountyChoosedActivity" />

        <activity android:name=".WeatherActivity"
            android:launchMode="singleTask"/>

        <activity android:name=".MainActivity">
            <intent-filter>
                <action android:name="android.intent.action.MAIN" />
                <category android:name="android.intent.category.LAUNCHER" />
            </intent-filter>
        </activity>

        <service
            android:name="com.baidu.location.f"
            android:enabled="true"
            android:process=":remote" />
    </application>

</manifest>

MainActivity

package com.coolweather.android;

import androidx.annotation.NonNull;
import androidx.appcompat.app.AppCompatActivity;
import androidx.core.app.ActivityCompat;
import androidx.core.content.ContextCompat;

import android.Manifest;
import android.content.Intent;
import android.content.SharedPreferences;
import android.content.pm.PackageManager;
import android.icu.text.UnicodeSetSpanner;
import android.os.Bundle;
import android.preference.PreferenceManager;
import android.widget.Toast;

import com.baidu.location.BDLocation;
import com.baidu.location.BDLocationListener;
import com.baidu.location.LocationClient;
import com.baidu.location.LocationClientOption;
import com.baidu.mapapi.SDKInitializer;
import com.coolweather.android.db.ChoosedCounty;
import com.coolweather.android.util.BDLocationUtil;
import com.coolweather.android.util.Utility;

import java.util.ArrayList;
import java.util.List;

public class MainActivity extends AppCompatActivity {


    private LocationClient locationClient;


    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        final SharedPreferences prefs = PreferenceManager.getDefaultSharedPreferences(this);
        if (prefs.getString("weather",null)!=null) {
            String countyName = prefs.getString("weatherCityName", "北京");
            SharedPreferences.Editor edit = prefs.edit();
            edit.putString("curCounty", countyName);
            edit.apply();
            Utility.saveChoosedCounty(countyName);
            Intent intent = new Intent(this, WeatherActivity.class);
            startActivity(intent);
            finish();
        } else {
            locationClient = new LocationClient(getApplicationContext());
            locationClient.registerLocationListener(new BDLocationListener() {
                @Override
                public void onReceiveLocation(final BDLocation location) {
                    String district = location.getDistrict();
                    if (district == null){
                        runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                Toast.makeText(MainActivity.this,
                                        "无法获取定位,重置定位为北京",
                                        Toast.LENGTH_SHORT).show();
                            }
                        });
                        district = "北京";
                    }
                    locationClient.stop();
                    Utility.saveChoosedCounty(district);
                    SharedPreferences.Editor edit = prefs.edit();
                    edit.putString("curCounty", district);
                    edit.apply();
                    Intent intent = new Intent(MainActivity.this, WeatherActivity.class);
                    startActivity(intent);
                    finish();
                }
            });
            SDKInitializer.initialize(getApplicationContext());
            //申请权限
            if (requestPermission()){
                requestLocation();
            }

        }
    }

    private void requestLocation(){
        LocationClientOption option = new LocationClientOption();
        option.setIsNeedAddress(true);
        locationClient.setLocOption(option);
        locationClient.start();
    }

    /**
     * 请求权限
     */
    private boolean requestPermission(){
        List<String> permissionList = new ArrayList<>();
        if (ContextCompat.checkSelfPermission(MainActivity.this, Manifest.
                permission.ACCESS_FINE_LOCATION)!= PackageManager.PERMISSION_GRANTED) {
            permissionList.add(Manifest.permission.ACCESS_FINE_LOCATION);
        }
        if (ContextCompat.checkSelfPermission(MainActivity.this, Manifest.
                permission.WRITE_EXTERNAL_STORAGE)!= PackageManager.PERMISSION_GRANTED) {
            permissionList.add(Manifest.permission.WRITE_EXTERNAL_STORAGE);
        }
        if (ContextCompat.checkSelfPermission(MainActivity.this,
                Manifest.permission.ACCESS_COARSE_LOCATION)!= PackageManager.PERMISSION_GRANTED){
            permissionList.add(Manifest.permission.ACCESS_COARSE_LOCATION);
        }
        if (ContextCompat.checkSelfPermission(MainActivity.this,
                Manifest.permission.READ_PHONE_STATE) != PackageManager.PERMISSION_GRANTED){
            permissionList.add(Manifest.permission.READ_PHONE_STATE);
        }
        if (!permissionList.isEmpty()) {
            String [] permissions = permissionList.toArray(new String[permissionList.
                    size()]);
            ActivityCompat.requestPermissions(MainActivity.this, permissions, 1);
            return false;
        }
        return true;
    }

    @Override
    public void onRequestPermissionsResult(int requestCode, @NonNull String[] permissions, @NonNull int[] grantResults) {
        switch (requestCode) {
            case 1:
                if (grantResults.length > 0) {
                    for (int result : grantResults) {
                        if (result != PackageManager.PERMISSION_GRANTED) {
                            Toast.makeText(this, "必须同意所有权限才能使用本程序",
                                    Toast.LENGTH_SHORT).show();
                            finish();
                            return;
                        }
                    }
                    requestLocation();
                } else {
                    Toast.makeText(this, "发生未知错误", Toast.LENGTH_SHORT).show();
                }
                break;
            default:
        }
    }


}

WeatherActivity

package com.coolweather.android;

import androidx.annotation.NonNull;
import androidx.annotation.RequiresApi;
import androidx.appcompat.app.AppCompatActivity;
import androidx.swiperefreshlayout.widget.SwipeRefreshLayout;
import androidx.viewpager.widget.PagerAdapter;
import androidx.viewpager.widget.ViewPager;

import android.annotation.SuppressLint;
import android.content.Intent;
import android.content.SharedPreferences;
import android.graphics.Color;
import android.os.Build;
import android.os.Bundle;
import android.preference.PreferenceManager;
import android.util.ArraySet;
import android.util.Log;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.Button;
import android.widget.ImageView;
import android.widget.LinearLayout;
import android.widget.ScrollView;
import android.widget.TextView;
import android.widget.Toast;

import com.bumptech.glide.Glide;
import com.coolweather.android.db.ChoosedCounty;
import com.coolweather.android.gson.Forecast;
import com.coolweather.android.gson.Lifestyle;
import com.coolweather.android.gson.Weather;
import com.coolweather.android.util.HttpUtil;
import com.coolweather.android.util.Utility;
import com.coolweather.android.view.PocketSwipeRefreshLayout;


import org.litepal.crud.DataSupport;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Response;

@RequiresApi(api = Build.VERSION_CODES.M)
public class WeatherActivity extends AppCompatActivity {

    /**
     * 采用singleTask模式,只存在一个weather活动
     *
     */
    private static final String TAG = "WeatherActivity";

    private PocketSwipeRefreshLayout swipeRefreshLayout;
    private ImageView bingPIcImg;
    private Button navButton;
    private TextView titleCityText;
    private LinearLayout dotLayout;

    private SharedPreferences prefs;

    private ViewPager viewPager;
    private PagerAdapter adapter;
    //保存每个页面view
    private ArrayList<View> pageList = new ArrayList<>();
    private List<ImageView> dotImageList = new ArrayList<>();
    //当前页索引
    private int position;
    //当前显示的城市
    private String curCountyName;

    //所有选中的城市
    private List<String> choosedCountyList = new ArrayList<>();
    //城市对应的位置
    private HashMap<String, Integer> countyIndex = new HashMap<>();

    //更新一个页面
    private int UPDATE_ONE_PAGE = 0;
    //更新所有页面
    private int UPDATE_ALL_PAGE = 1;
    //更新标记,用来切换下拉刷新更新一个页面 和 减少页面更新所有页面来去除缓存
    private int updateStatu = UPDATE_ONE_PAGE;



    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_weather);
        //实现状态栏配色统一
        if (Build.VERSION.SDK_INT>=21){
            View decorView = getWindow().getDecorView();
            decorView.setSystemUiVisibility(View.SYSTEM_UI_FLAG_LAYOUT_FULLSCREEN|
                    View.SYSTEM_UI_FLAG_LAYOUT_STABLE);
            getWindow().setStatusBarColor(Color.TRANSPARENT);
        }
        setContentView(R.layout.activity_weather);
        //数据初始化-- 将传入的数据赋值到对应的位置
        choosedCountyList = initChoosedCounties();
        prefs = PreferenceManager.getDefaultSharedPreferences
                (this);
        curCountyName = prefs.getString("curCounty",choosedCountyList.get(0));
        position = getPositionFromName(curCountyName, choosedCountyList);

        //圆点来显示当前页面的位置
        dotLayout = (LinearLayout) findViewById(R.id.dot_layout);
        for (int i = 0; i < choosedCountyList.size(); i++) {
            ImageView imageView = new ImageView(WeatherActivity.this);
            imageView.setLayoutParams(new ViewGroup.LayoutParams(30, 30));
            imageView.setPadding(20, 0, 20, 0);
            if (i == position) {
                // 默认选中第一张图片
                imageView.setBackgroundResource(R.drawable.dot_focused);
            } else {
                imageView.setBackgroundResource(R.drawable.dot_normal);
            }
            dotImageList.add(imageView);
            dotLayout.addView(imageView);
        }


        //下拉刷新
        swipeRefreshLayout = (PocketSwipeRefreshLayout) findViewById(R.id.swipe_refresh);
        swipeRefreshLayout.setColorSchemeResources(R.color.colorPrimary);
        swipeRefreshLayout.setOnRefreshListener(new PocketSwipeRefreshLayout.OnRefreshListener() {
            @Override
            public void onRefresh() {
                updatePageView();
                Toast.makeText(WeatherActivity.this, "刷新",
                        Toast.LENGTH_SHORT).show();
                swipeRefreshLayout.setRefreshing(false);
            }
        });
        //tupian
        bingPIcImg = (ImageView) findViewById(R.id.bing_pic_img);
        String bingPic = prefs.getString("bing_pic", null);
        if (bingPic != null){
            Glide.with(this).load(bingPic).into(bingPIcImg);
        }else {
            loadBingPIc();
        }
        //添加按钮
        navButton = (Button) findViewById(R.id.nav_button);
        navButton.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                Intent intent = new Intent(WeatherActivity.this,
                        CountyChoosedActivity.class);
                startActivity(intent);
            }
        });

        titleCityText = (TextView) findViewById(R.id.title_city);
        titleCityText.setText(curCountyName);

        //初始化pager,每个页面都是一个城市的天气
        viewPager = (ViewPager) findViewById(R.id.weather_pager);
        LayoutInflater layoutInflater = getLayoutInflater();
        for (int i=0;i<choosedCountyList.size(); i++){
            View view = layoutInflater.inflate(R.layout.weather_layout, null);
            view.setTag(i);
            String countyName = choosedCountyList.get(i);
            fillView(view, countyName,true);
            pageList.add(view);
        }
        adapter = new PagerAdapter() {
            @Override
            public int getCount() {
                return pageList.size();
            }

            @Override
            public int getItemPosition(@NonNull Object object) {
                if (updateStatu == UPDATE_ONE_PAGE){
                    //更新其中一个页面
                    View view = (View) object;
                    int currentPagerIdx = viewPager.getCurrentItem();
                    if (currentPagerIdx == (Integer) view.getTag()) {
                        return POSITION_NONE;
                    } else {
                        return POSITION_UNCHANGED;
                    }
                }
                else if (updateStatu == UPDATE_ALL_PAGE){
                    //更新所有页面,目的为去除缓存
                    return POSITION_NONE;
                }
                return POSITION_NONE;
            }

            @Override
            public boolean isViewFromObject(@NonNull View view, @NonNull Object object) {
                return view==object;
            }

            @Override
            public void destroyItem(@NonNull ViewGroup container, int position, @NonNull Object object) {
                container.removeView((View) object);
            }

            @NonNull
            @Override
            public Object instantiateItem(@NonNull ViewGroup container, int position) {
                container.addView(pageList.get(position));
                return pageList.get(position);
            }
        };
        viewPager.setAdapter(adapter);
        //侦测当前页面位置
        viewPager.setOnPageChangeListener(new ViewPager.OnPageChangeListener() {
            @Override
            public void onPageScrolled(int position, float positionOffset, int positionOffsetPixels) {
            }

            @Override
            public void onPageSelected(int position) {
                WeatherActivity.this.position = position;
                curCountyName = choosedCountyList.get(position);
                titleCityText.setText(curCountyName);
                for (int i=0;i<dotImageList.size(); i++){
                    dotImageList.get(i).setBackgroundResource(R.drawable.dot_normal);
                }
                dotImageList.get(position).setBackgroundResource(R.drawable.dot_focused);
            }

            @Override
            public void onPageScrollStateChanged(int state) {

            }
        });

        viewPager.setCurrentItem(position);
        viewPager.setOffscreenPageLimit(8);

    }

    private int getPositionFromName(String name,List<String> list){
        for (int i = 0; i < list.size(); i++) {
            if (name.equals(list.get(i))){
                return i;
            }
        }
        return -1;
    }

    public  boolean equalList(List<String> a, List<String> b) {
        if (a==b) return true;
        if (a==null || b==null) return false;
        int length = a.size();
        if (b.size() != length) return false;
        for (int i=0; i<length; i++)
            if (!a.get(i).equals(b.get(i))) return false;
        return true;
    }

    /**
     * 当重新激活后,
     * 1)比较选择城市是否发生改变,若无,显示当前选中的城市页面
     * 2 发生改变,添加或减少页面
     */
    @Override
    protected void onRestart() {
        super.onRestart();
        //当前选择的城市列表
        List<String> curList = initChoosedCounties();
        curCountyName = prefs.getString("curCounty", curList.get(0));
        position = getPositionFromName(curCountyName, curList);
        //改变圆点位置
        dotImageList.clear();
        dotLayout.removeAllViews();
        for (int i = 0; i < curList.size(); i++) {
            ImageView imageView = new ImageView(WeatherActivity.this);
            imageView.setLayoutParams(new ViewGroup.LayoutParams(30, 30));
            imageView.setPadding(20, 0, 20, 0);
            dotImageList.add(imageView);
            dotLayout.addView(imageView);
        }

        //当前变化
        if (!equalList(curList, choosedCountyList)){
            LayoutInflater layoutInflater = getLayoutInflater();
            pageList.clear();
            for (int i=0; i< curList.size();i++){
                //以前的没有,新增的城市,新增页面
                View view = layoutInflater.inflate(R.layout.weather_layout, null);
                view.setTag(i);
                String countyName = curList.get(i);
                fillView(view, countyName,false);
                pageList.add(i, view);
            }
            updateStatu = UPDATE_ALL_PAGE;
            adapter.notifyDataSetChanged();

        }
        //一定要在选择页面前更新城市列表,不然增加城市时onPageSelected出现超过索引的问题
        choosedCountyList = curList;
        viewPager.setCurrentItem(position);
    }

    private List<String> initChoosedCounties(){
        List<ChoosedCounty> all = DataSupport.findAll(ChoosedCounty.class);
        List<String> curList = new ArrayList<>();
        for (int i=0; i<all.size(); i++){
            curList.add(all.get(i).getCountyName());
        }
        return curList;
    }

    /**
     * 根据城市名获取内容,并填充view
     * @param countyName
     * @param isUpdate 是否更新天气数据
     */
    private void fillView(View view, String countyName, boolean isUpdate){
        if (isUpdate){
            requestWeather(view,countyName);
        }else {
            String weatherString = prefs.getString(countyName, null);
            Weather weather = null;
            if (weatherString == null){
                // 无缓存时去服务器查询天气
                requestWeather(view, countyName);
            }else {
                weather = Utility.handleWeatherResponse(weatherString);
                showWeatherInfo(view, weather);
            }
        }

    }

    /**
     * 根据城市名来查询天气
     * 支持中英文与拼音,
     * @param city
     */
    public void requestWeather(final View view, final String city){
        String weatherUrl = "https://free-api.heweather.net/s6/weather/?location="
                +city+"&key=自己和风天气的key";
        HttpUtil.sendOkHttpRequest(weatherUrl, new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                e.printStackTrace();
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        Toast.makeText(WeatherActivity.this, "获取天气信息失败",
                                Toast.LENGTH_SHORT).show();
                        swipeRefreshLayout.setRefreshing(false);
                    }
                });
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                final String responseText = response.body().string();
                final Weather weather = Utility.handleWeatherResponse(responseText);
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        if (weather!=null && "ok".equals(weather.status)){
                            @SuppressLint("CommitPrefEdits")
                            SharedPreferences.Editor edit = prefs.edit();
                            edit.putString(city, responseText);
                            edit.apply();
                            showWeatherInfo(view, weather);
                        }else {
                            Toast.makeText(WeatherActivity.this,"获取天气信息失败",
                                    Toast.LENGTH_SHORT).show();
                        }
                        swipeRefreshLayout.setRefreshing(false);
                    }
                });
            }
        });
        loadBingPIc();
    }

    /**
     * 处理并展示Weather实体类数据信息
     * @param weather
     */
    private void showWeatherInfo(View view, Weather weather){
        // 初始化各控件
        ScrollView weatherLayout = (ScrollView) view.findViewById(R.id.weather_layout);
        //解决SwipeRefreshLayout与ScrollView的下拉冲突
        weatherLayout.setOnScrollChangeListener(new View.OnScrollChangeListener() {
            @Override
            public void onScrollChange(View v, int scrollX, int scrollY, int oldScrollX, int oldScrollY) {
                if (swipeRefreshLayout != null){
                    swipeRefreshLayout.setEnabled(scrollY==0);
                }
            }
        });
        TextView updateTimeText = (TextView) view.findViewById(R.id.update_time);
        TextView degreeText = (TextView) view.findViewById(R.id.degree_text);
        TextView weatherInfoText = (TextView) view.findViewById(R.id.weather_info_text);
        LinearLayout forecastLayout = (LinearLayout) view.findViewById(R.id.forecast_layout);
        LinearLayout lifestyleLayout  = (LinearLayout) view.findViewById(R.id.lifestyle_layout);
        String updateTime = weather.update.loc;
        String degree = weather.now.tmp+"℃";
        String weatherInfo = weather.now.cond_txt;
        //填充内容
        updateTimeText.setText(updateTime);
        degreeText.setText(degree);
        weatherInfoText.setText(weatherInfo);
        forecastLayout.removeAllViews();
        for (Forecast forecast:weather.daily_forecast){
            View viewN = LayoutInflater.from(this).inflate(R.layout.forecast_item,
                    forecastLayout, false);
            TextView dateText = (TextView) viewN.findViewById(R.id.date_text);
            TextView infoText = (TextView) viewN.findViewById(R.id.info_text);
            TextView minText = (TextView) viewN.findViewById(R.id.min_text);
            TextView maxText = (TextView) viewN.findViewById(R.id.max_text);

            dateText.setText(forecast.date);
            infoText.setText(forecast.cond_txt_d);
            minText.setText(forecast.tmp_min);
            maxText.setText(forecast.tmp_max);
            forecastLayout.addView(viewN);
        }
        lifestyleLayout.removeAllViews();
        for (Lifestyle lifestyle:weather.lifestyles){
            View viewN = LayoutInflater.from(this).inflate(R.layout.lifestyle_item,
                    lifestyleLayout, false);
            TextView lifestyle_text_text = (TextView) viewN.findViewById(R.id.lifestyle_text_text);
            TextView lifestyle_type_text = (TextView) viewN.findViewById(R.id.lifestyle_type_text);
            lifestyle_type_text.setText(Utility.translate(lifestyle.type) +" "+ lifestyle.brf);
            lifestyle_text_text.setText(lifestyle.txt);
            lifestyleLayout.addView(viewN);
        }
        weatherLayout.setVisibility(View.VISIBLE);
    }

    /**
     * 更新当前页天气
     */
    private void updatePageView(){
        updateStatu = UPDATE_ONE_PAGE;
        View view = pageList.get(position);
        String countyName = choosedCountyList.get(position);
        fillView(view, countyName, true);
        adapter.notifyDataSetChanged();
    }

    private void loadBingPIc(){
        String requestBingPicUrl = "http://guolin.tech/api/bing_pic";
        HttpUtil.sendOkHttpRequest(requestBingPicUrl, new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                e.printStackTrace();
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                final String pic = response.body().string();
                SharedPreferences.Editor edit = PreferenceManager.getDefaultSharedPreferences(WeatherActivity.this).edit();
                edit.putString("bing_pic", pic);
                edit.apply();
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        Glide.with(WeatherActivity.this).load(pic).into(bingPIcImg);
                    }
                });
            }
        });


    }

}

CountyChoosedActivity
展示已选中的城市列表和删除城市列表

package com.coolweather.android;

import androidx.appcompat.app.AlertDialog;
import androidx.appcompat.app.AppCompatActivity;
import androidx.fragment.app.Fragment;
import androidx.fragment.app.FragmentManager;
import androidx.fragment.app.FragmentTransaction;

import android.annotation.SuppressLint;
import android.content.DialogInterface;
import android.content.Intent;
import android.content.SharedPreferences;
import android.os.Bundle;
import android.preference.PreferenceManager;
import android.util.ArraySet;
import android.util.Log;
import android.view.View;
import android.widget.AdapterView;
import android.widget.ArrayAdapter;
import android.widget.Button;
import android.widget.ListView;
import android.widget.TextView;
import android.widget.Toast;

import com.baidu.location.BDLocation;
import com.baidu.location.BDLocationListener;
import com.baidu.location.LocationClient;
import com.baidu.location.LocationClientOption;
import com.coolweather.android.db.ChoosedCounty;
import com.coolweather.android.util.BDLocationUtil;
import com.coolweather.android.util.Utility;

import org.litepal.crud.DataSupport;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

import static org.litepal.LitePalApplication.getContext;

public class CountyChoosedActivity extends AppCompatActivity {

    private static final String TAG = "CountyChoosedActivity";
    private SharedPreferences prefs;
    private Button addCountyButton;
    private ListView listView;
    private Button locationCityButton;
    private ArrayAdapter<String> adapter;
    private List<String> countyList = new ArrayList<>();
    private List<ChoosedCounty> countyListDB = new ArrayList<>();

    private LocationClient locationClient;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_county_choosed);
        prefs = PreferenceManager.getDefaultSharedPreferences(this);

        queryChoosedCountyList();
        addCountyButton = (Button) findViewById(R.id.add_county_button);
        listView = (ListView) findViewById(R.id.county_list_choosed_view);
        locationCityButton = (Button) findViewById(R.id.location_city_button);

        startLocation();

        addCountyButton.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                Intent intent = new Intent(getContext(), ChooseAreaActivity.class);
                startActivity(intent);
                finish();
            }
        });

        adapter = new ArrayAdapter<String>(CountyChoosedActivity.this,
                android.R.layout.simple_list_item_1, countyList);
        listView.setAdapter(adapter);
        listView.setOnItemClickListener(new AdapterView.OnItemClickListener() {
            @Override
            public void onItemClick(AdapterView<?> parent, View view, int position, long id) {
                String s = countyList.get(position);
                SharedPreferences.Editor edit = prefs.edit();
                edit.putString("curCounty", s);
                edit.apply();
                Intent intent = new Intent(getContext(), WeatherActivity.class);
                startActivity(intent);
                finish();
            }
        });
        listView.setOnItemLongClickListener(new AdapterView.OnItemLongClickListener() {
            @Override
            public boolean onItemLongClick(AdapterView<?> parent, View view, final int position, long id) {
                //定义AlertDialog.Builder对象,当长按列表项的时候弹出确认删除对话框
                AlertDialog.Builder builder=new AlertDialog.Builder(CountyChoosedActivity.this);
                builder.setMessage("确定删除?");
                builder.setTitle("提示");

                //添加AlertDialog.Builder对象的setPositiveButton()方法
                builder.setPositiveButton("确定", new DialogInterface.OnClickListener() {
                    @Override
                    public void onClick(DialogInterface dialog, int which) {
                        String countyName= countyList.get(position);
                        if(countyList.remove(position)!=null){
                            DataSupport.deleteAll(ChoosedCounty.class,
                                    "countyName = ?", countyName);
                            System.out.println("success");
                        }else {
                            System.out.println("failed");
                        }
                        adapter.notifyDataSetChanged();
                        Toast.makeText(getBaseContext(), "删除列表项", Toast.LENGTH_SHORT).show();
                    }
                });

                //添加AlertDialog.Builder对象的setNegativeButton()方法
                builder.setNegativeButton("取消", new DialogInterface.OnClickListener() {
                    @Override
                    public void onClick(DialogInterface dialog, int which) {

                    }
                });

                AlertDialog alertDialog = builder.create();
                alertDialog.setCanceledOnTouchOutside(false);
                alertDialog.show();
                return true;

            }
        });

    }

    private void startLocation(){
        locationClient = new LocationClient(getApplicationContext());
        locationClient.registerLocationListener(new BDLocationListener() {
            @Override
            public void onReceiveLocation(final BDLocation location) {
                String district = location.getDistrict();
                if (district == null){
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            Toast.makeText(CountyChoosedActivity.this,
                                    "无法获取定位,重置定位为北京",
                                    Toast.LENGTH_SHORT).show();
                        }
                    });
                    district = "北京";
                    locationCityButton.setText( district+"--重新定位");
                }else {
                    locationCityButton.setText( district+"--定位");
                }
                locationClient.stop();
                final String finalDistrict = district;
                locationCityButton.setOnClickListener(new View.OnClickListener() {
                    @Override
                    public void onClick(View v) {
                        SharedPreferences.Editor edit = prefs.edit();
                        Utility.saveChoosedCounty(finalDistrict);
                        edit.putString("curCounty", finalDistrict);
                        edit.apply();
                        Intent intent = new Intent(CountyChoosedActivity.this, WeatherActivity.class);
                        startActivity(intent);
                        finish();
                    }
                });
            }
        });
        LocationClientOption option = new LocationClientOption();
        option.setIsNeedAddress(true);
        locationClient.setLocOption(option);
        locationClient.start();
    }


    private void queryChoosedCountyList(){
        countyListDB = DataSupport.findAll(ChoosedCounty.class);
        for (ChoosedCounty county:countyListDB){
            countyList.add(county.getCountyName());
        }
    }


}

ChooseAreaActivity和ChooseAreaFragment,用来新增城市

package com.coolweather.android;

import androidx.annotation.Nullable;
import androidx.appcompat.app.AppCompatActivity;

import android.os.Bundle;


public class ChooseAreaActivity extends AppCompatActivity {
    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_choose_area);
    }

}
package com.coolweather.android;

import android.app.ProgressDialog;
import android.content.Intent;
import android.content.SharedPreferences;
import android.os.Bundle;
import android.preference.PreferenceManager;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.AdapterView;
import android.widget.ArrayAdapter;
import android.widget.Button;
import android.widget.ListView;
import android.widget.TextView;
import android.widget.Toast;

import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.fragment.app.Fragment;

import com.coolweather.android.db.City;
import com.coolweather.android.db.County;
import com.coolweather.android.db.Province;
import com.coolweather.android.util.HttpUtil;
import com.coolweather.android.util.Utility;

import org.litepal.crud.DataSupport;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Response;

public class ChooseAreaFragment extends Fragment {

    private SharedPreferences prefs;
    public static final int LEVEL_PROVINCE = 0;
    public static final int LEVEL_CITY = 1;
    public static final int LEVEL_COUNTY = 2;
    private ProgressDialog progressDialog;
    private TextView titleText;
    private Button backButton;
    private ListView listView;
    private ArrayAdapter<String> adapter;
    private List<String> dataList = new ArrayList<>();
    /**
     * 省列表
     */
    private List<Province> provinceList;
    /**
     * 市列表
     */
    private List<City> cityList;
    /**
     * 县列表
     */
    private List<County> countyList;

    /**
     * 选中的省份
    */
    private Province selectedProvince;
    /**
     * 选中的城市
     */
    private City selectedCity;
    /**
     * 当前选中的级别
     */
    private int currentLevel;

    @Nullable
    @Override
    public View onCreateView(@NonNull LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
        prefs = PreferenceManager.getDefaultSharedPreferences(getContext());
        View view = inflater.inflate(R.layout.choose_area, container, false);
        titleText = (TextView)view.findViewById(R.id.title_text);
        backButton = (Button) view.findViewById(R.id.back_button);
        listView = (ListView) view.findViewById(R.id.list_view);

        adapter = new ArrayAdapter<>(getContext(), android.R.layout.simple_list_item_1, dataList);
        listView.setAdapter(adapter);
        return view;
    }

    @Override
    public void onActivityCreated(@Nullable Bundle savedInstanceState) {
        super.onActivityCreated(savedInstanceState);
        listView.setOnItemClickListener(new AdapterView.OnItemClickListener() {
            @Override
            public void onItemClick(AdapterView<?> parent, View view, int position, long id) {
                if (currentLevel == LEVEL_PROVINCE){
                    selectedProvince = provinceList.get(position);
                    queryCities();
                }else if (currentLevel == LEVEL_CITY){
                    selectedCity = cityList.get(position);
                    queryCounties();
                }else if (currentLevel== LEVEL_COUNTY){
                    String countyName = countyList.get(position).getCountyName();
                    Intent intent = new Intent(getActivity(), WeatherActivity.class);
                    //保存添加的城市
                    Utility.saveChoosedCounty(countyName);
                    SharedPreferences.Editor edit = prefs.edit();
                    edit.putString("curCounty", countyName);
                    edit.apply();
                    startActivity(intent);
                    getActivity().finish();

                }
            }
        });

        backButton.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                if (currentLevel == LEVEL_COUNTY) {
                    queryCities();
                } else if (currentLevel == LEVEL_CITY) {
                    queryProvinces();
                }else if(currentLevel == LEVEL_PROVINCE){
                    Intent intent = new Intent(getActivity(), CountyChoosedActivity.class);
                    startActivity(intent);
                    getActivity().finish();
                }
            }
        });
        //初始化省份数据
        queryProvinces();
    }

    /**
     * 查询全国所有的省, 优先从数据库查询, 如果没有查询到再去服务器上查询
    */
    private void queryProvinces() {
        titleText.setText("中国");
        backButton.setVisibility(View.VISIBLE);
        provinceList = DataSupport.findAll(Province.class);
        if (provinceList.size() > 0) {
            dataList.clear();
            for (Province province : provinceList) {
                dataList.add(province.getProvinceName());
            }
            adapter.notifyDataSetChanged();
            listView.setSelection(0);
            currentLevel = LEVEL_PROVINCE;
        } else {
            String address = "http://guolin.tech/api/china";
            queryFromServer(address, "province");
        }
    }

    /**
     * 查询选中省内所有的市, 优先从数据库查询, 如果没有查询到再去服务器上查询
    */
    private void queryCities() {
        titleText.setText(selectedProvince.getProvinceName());backButton.setVisibility(View.VISIBLE);
        cityList = DataSupport.where("provinceid = ?",
                String.valueOf(selectedProvince.getId())).find(City.class);
        if (cityList.size() > 0) {
            dataList.clear();
            for (City city : cityList) {
                dataList.add(city.getCityName());
            }
            adapter.notifyDataSetChanged();
            listView.setSelection(0);
            currentLevel = LEVEL_CITY;
        } else {
            int provinceCode = selectedProvince.getProvinceCode();
            String address = "http://guolin.tech/api/china/" + provinceCode;
            queryFromServer(address, "city");
        }
    }

    /**
     * 查询选中市内所有的县, 优先从数据库查询, 如果没有查询到再去服务器上查询
    */
    private void queryCounties() {
        titleText.setText(selectedCity.getCityName());
        backButton.setVisibility(View.VISIBLE);
        countyList = DataSupport.where("cityid = ?",
                String.valueOf(selectedCity.
                getId())).find(County.class);
        if (countyList.size() > 0) {
            dataList.clear();
            for (County county : countyList) {
                dataList.add(county.getCountyName());
            }
            adapter.notifyDataSetChanged();
            listView.setSelection(0);
            currentLevel = LEVEL_COUNTY;
        } else {
            int provinceCode = selectedProvince.getProvinceCode();
            int cityCode = selectedCity.getCityCode();
            String address = "http://guolin.tech/api/china/" + provinceCode + "/" +
                    cityCode;
            queryFromServer(address, "county");
        }
    }

    /**
     * 根据传入的地址和类型从服务器上查询省市县数据
    */
    private void queryFromServer(String address, final String type) {
        showProgressDialog();
        HttpUtil.sendOkHttpRequest(address, new Callback() {
            @Override
            public void onResponse(Call call, Response response) throws IOException {
                String responseText = response.body().string();
                boolean result = false;
                if ("province".equals(type)) {
                    result = Utility.handleProvinceResponse(responseText);
                } else if ("city".equals(type)) {
                    result = Utility.handleCityResponse(responseText,
                            selectedProvince.getId());
                } else if ("county".equals(type)) {
                    result = Utility.handleCountyResponse(responseText,
                            selectedCity.getId());
                }
                if (result) {
                    getActivity().runOnUiThread(new Runnable() {@Override
                    public void run() {
                        closeProgressDialog();
                        if ("province".equals(type)) {
                            queryProvinces();
                        } else if ("city".equals(type)) {
                            queryCities();
                        } else if ("county".equals(type)) {
                            queryCounties();
                        }
                    }
                    });
                }
            }

            @Override
            public void onFailure(Call call, IOException e) {
            // 通过runOnUiThread()方法回到主线程处理逻辑
                getActivity().runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        closeProgressDialog();
                        Toast.makeText(getContext(), "加载失败", Toast.LENGTH_SHORT).
                                show();
                    }
                });
            }
        });
    }

    /**
     * 显示进度对话框
    */
    private void showProgressDialog() {
        if (progressDialog == null) {
            progressDialog = new ProgressDialog(getActivity());
            progressDialog.setMessage("正在加载...");
            progressDialog.setCanceledOnTouchOutside(false);
        }
        progressDialog.show();
    }
    /**
     * 关闭进度对话框
    */
    private void closeProgressDialog() {
        if (progressDialog != null) {
            progressDialog.dismiss();
        }
    }
}

AutoUpdateService后台服务,用来自动更新天气

package com.coolweather.android;

import android.app.AlarmManager;
import android.app.PendingIntent;
import android.app.Service;
import android.content.Intent;
import android.content.SharedPreferences;
import android.os.IBinder;
import android.os.SystemClock;
import android.preference.PreferenceManager;

import com.bumptech.glide.Glide;
import com.coolweather.android.db.ChoosedCounty;
import com.coolweather.android.gson.Weather;
import com.coolweather.android.util.BDLocationUtil;
import com.coolweather.android.util.HttpUtil;
import com.coolweather.android.util.Utility;

import org.litepal.crud.DataSupport;

import java.io.IOException;
import java.util.List;

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Response;

public class AutoUpdateService extends Service {
    public AutoUpdateService() {
    }

    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        updateBingPic();
        //更新天气
        String districtLocation = BDLocationUtil.getLocationDistrict(getApplicationContext());
        updateWeather(districtLocation);
        List<ChoosedCounty> list = DataSupport.findAll(ChoosedCounty.class);
        for(ChoosedCounty county:list){
            updateWeather(county.getCountyName());
        }

        AlarmManager manager = (AlarmManager) getSystemService(ALARM_SERVICE);
        int time = 8 * 60 * 60 * 1000;
        long triggerTime = SystemClock.elapsedRealtime()+time;
        Intent intentA = new Intent(this, AutoUpdateService.class);
        PendingIntent pendingIntent = PendingIntent.getService(this, 0, intentA, 0);
        manager.cancel(pendingIntent);
        manager.set(AlarmManager.ELAPSED_REALTIME_WAKEUP, triggerTime, pendingIntent);
        return super.onStartCommand(intent, flags, startId);
    }

    private void updateWeather(final String name){
        SharedPreferences pref = PreferenceManager.getDefaultSharedPreferences(this);
        //和风天气
        String weatherUrl =  "https://free-api.heweather.net/s6/weather/?location="
                +name+"&key=自己的key";
        HttpUtil.sendOkHttpRequest(weatherUrl, new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                e.printStackTrace();
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                String reponseText = response.body().string();
                Weather weather = Utility.handleWeatherResponse(reponseText);
                if (weather!=null  && "ok".equals(weather.status)){
                    SharedPreferences.Editor edit = PreferenceManager.
                            getDefaultSharedPreferences(AutoUpdateService.this).edit();
                    edit.putString(name, reponseText);
                    edit.apply();
                }
            }
        });
    }

    private void updateBingPic(){
        String requestBingPicUrl = "http://guolin.tech/api/bing_pic";
        HttpUtil.sendOkHttpRequest(requestBingPicUrl, new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                e.printStackTrace();
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                final String pic = response.body().string();
                SharedPreferences.Editor edit = PreferenceManager.getDefaultSharedPreferences(AutoUpdateService.this).edit();
                edit.putString("bing_pic", pic);
                edit.apply();
            }
        });
    }
}

Utility和HttpUtil工具类

package com.coolweather.android.util;

import android.text.TextUtils;
import com.coolweather.android.db.ChoosedCounty;
import com.coolweather.android.db.City;
import com.coolweather.android.db.County;
import com.coolweather.android.db.Province;
import com.coolweather.android.gson.Weather;
import com.google.gson.Gson;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.litepal.crud.DataSupport;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class Utility {

    /**
     * 解析处理服务器返回的省级数据
     */
    public static boolean handleProvinceResponse(String response){
        if (!TextUtils.isEmpty(response)){
            try{
                JSONArray allProvinces = new JSONArray(response);
                for (int i = 0; i < allProvinces.length(); i++) {
                    JSONObject provinceObject = allProvinces.getJSONObject(i);
                    Province province = new Province();
                    province.setProvinceName(provinceObject.getString("name"));
                    province.setProvinceCode(provinceObject.getInt("id"));
                    province.save();
                }
                return true;
            } catch (JSONException e) {
                e.printStackTrace();
            }

        }
        return false;
    }

    /**
     * 解析处理服务器返回的市ji数据
     * @param response
     * @param provinceId
     * @return
     */
    public static boolean handleCityResponse(String response, int provinceId){
        if (!TextUtils.isEmpty(response)) {
            try {
                JSONArray allCities = new JSONArray(response);
                for (int i = 0; i < allCities.length(); i++) {
                    JSONObject cityObject = allCities.getJSONObject(i);
                    City city = new City();
                    city.setCityName(cityObject.getString("name"));
                    city.setCityCode(cityObject.getInt("id"));
                    city.setProvinceId(provinceId);
                    city.save();
                }
                return true;
            } catch (JSONException e) {
                e.printStackTrace();
            }
        }
        return false;
    }

    /**
     * 解析和处理服务器返回的县级数据
    */
    public static boolean handleCountyResponse(String response, int cityId) {
        if (!TextUtils.isEmpty(response)) {
            try {
                JSONArray allCounties = new JSONArray(response);
                for (int i = 0; i < allCounties.length(); i++) {
                    JSONObject countyObject = allCounties.getJSONObject(i);
                    County county = new County();
                    county.setCountyName(countyObject.getString("name"));
                    county.setWeatherId(countyObject.getString("weather_id"));
                    county.setCityId(cityId);
                    county.save();
                }
                return true;
            } catch (JSONException e) {
                e.printStackTrace();
            }
        }
        return false;
    }

    public static Weather handleWeatherResponse(String response){
        try {
            JSONObject jsonObject = new JSONObject(response);
            JSONArray jsonArray = jsonObject.getJSONArray("HeWeather6");
            String weatherContent = jsonArray.getJSONObject(0).toString();
            return new Gson().fromJson(weatherContent, Weather.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private static final Map<String, String> myMap;
    static
    {
        myMap = new HashMap<String, String>();
        myMap.put("comf","舒适度指数");
        myMap.put("cw","洗车指数");
        myMap.put("drsg","穿衣指数");
        myMap.put("flu","感冒指数");
        myMap.put("ptfc","交通指数");
        myMap.put("trav","旅游指数");
        myMap.put("sport","运动指数");
        myMap.put("uv","紫外线指数");
        myMap.put("air","空气污染扩散条件指数");
        myMap.put("ac","空调开启指数");
        myMap.put("ag","过敏指数");
        myMap.put("gl","太阳镜指数");
        myMap.put("mu","化妆指数");
        myMap.put("airc","晾晒指数");
        myMap.put("fsh","钓鱼指数");
        myMap.put("spi","防晒指数");
    }

    public static String translate(String type){
        return myMap.get(type);
    }


    public static void saveChoosedCounty(String countyName){
        //去重
        List<ChoosedCounty> all = DataSupport.findAll(ChoosedCounty.class);
        for (ChoosedCounty county: all){
            if (countyName.equals(county.getCountyName())){
                return;
            }
        }
        ChoosedCounty county = new ChoosedCounty(countyName);
        county.save();
    }

}

package com.coolweather.android.util;

import okhttp3.OkHttpClient;
import okhttp3.Request;

public class HttpUtil {

    public static void sendOkHttpRequest(String address, okhttp3.Callback callback){
        OkHttpClient client = new OkHttpClient();
        Request request = new Request.Builder().url(address).build();
        client.newCall(request).enqueue(callback);
    }
}

项目的github地址

InnoDB体系结构

后台线程

内存

InnoDB关键特性

InnoDB存储引擎关键特性:

  • 插入缓冲
  • 两次写
  • 自适应哈希索引
  • 异步IO
  • 刷新邻接页

插入缓冲

Insert Buffer

两次写

doublewrite由两部分组成,一部分是内存中的doublewrite buffer,大小为2MB,另一部分是物理磁盘上共享表空间中连续128个页,大小同样为2MB。

Mysql技术内幕(InnoDB存储引擎)

锁的类型

InnoDB存储引擎实现了如下两种标准的行级锁:

  • 共享锁(SLock),允许事务读一行数据。
  • 排他锁(XLock),允许事务删除或更新一行数据。
X S
X 不兼容 不兼容
S 不兼容 兼容

此外,InnoDB存储引擎支持多粒度(granular)锁定,这种锁定允许事务在行级上的
锁和表级上的锁同时存在。为了支持在不同粒度上进行加锁操作,InnoDB存储引擎支持
一种额外的锁方式,称之为意向锁(Intention Lock)。意向锁是将锁定的对象分为多个层
次,意向锁意味着事务希望在更细粒度(fine granularity)上进行加锁。

一致性非锁定读

致性的非锁定读(consistent nonlocking read)是指InnoDB存储引繁通过行多版本控制(multi versioning)的方式来读取当前执行时间数据库中行的数据。如果读取的行正在执行DELETE或UPDATE操作,这时读取操作不会因此去等待行上锁的释放。相反地,InnoDB存储引擎会去读取行的一个快照数据。

快照数据是指该行的之前版本的数据,该实现是通过undo段来完成。而undo用来在事务中回滚数据,因此快照数据本身是没有额外的开销。此外,读取快照数据是不要上锁的,因为没有事务需要对历史的数据进行修改操作。可以看到,非锁定读机制极大地提髙了数据库的并发性。在InnoDB存储引擎的默认设置下,这是默认的读取方式,即读取不会占用和等待表上的锁。

但是在不同事务隔离级别下,读取的方式不同,并不是在每个事务隔离级别下都是采用非锁定的一致性读。在事务隔离级别READ COMMITTED和REPEATABLE READ (InnoDB存储引擎的默认事务隔离级別)下,InnoDB存储引擎使用非锁定的一致性读。然而,对于快照数据的定义却不相同。在READ COMMITTED事务隔离级别下,对于快照数据,非一致性读总是读取被锁定行的最新一份快照数据。而在REPEATABLE READ事务隔离级别
下,对于快照数据,非一致性读总是读取事务开始时的行数据版本。

MVCC的实现过程

版本号

系统版本号:是一个递增的数字,每开始一个新的事务,系统版本号就会自动递增。

事务版本号:事务开始时的系统版本号。

隐藏的列

MVCC 在每行记录后面都保存着两个隐藏的列,用来存储两个版本号:

创建版本号:指示创建一个数据行的快照时的系统版本号;

删除版本号:如果该快照的删除版本号大于当前事务版本号表示该快照有效,否则表示该快照已经被删除了。

以下实现过程针对可重复读隔离级别。

当开始一个事务时,该事务的版本号肯定大于当前所有数据行快照的创建版本号,理解这一点很关键。数据行快照的创建版本号是创建数据行快照时的系统版本号,系统版本号随着创建事务而递增,因此新创建一个事务时,这个事务的系统版本号比之前的系统版本号都大,也就是比所有数据行快照的创建版本号都大。

  1. SELECT

多个事务必须读取到同一个数据行的快照,并且这个快照是距离现在最近的一个有效快照。但是也有例外,如果有一个事务正在修改该数据行,那么它可以读取事务本身所做的修改,而不用和其它事务的读取结果一致。

把没有对一个数据行做修改的事务称为 T,T 所要读取的数据行快照的创建版本号必须小于等于 T 的版本号,因为如果大于 T 的版本号,那么表示该数据行快照是其它事务的最新修改,因此不能去读取它。除此之外,T 所要读取的数据行快照的删除版本号必须是未定义或者大于 T 的版本号,因为如果小于等于 T 的版本号,那么表示该数据行快照是已经被删除的,不应该去读取它。

  1. INSERT
    将当前系统版本号作为数据行快照的创建版本号。

  2. DELETE
    将当前系统版本号作为数据行快照的删除版本号。

  3. UPDATE
    将当前系统版本号作为更新前的数据行快照的删除版本号,并将当前系统版本号作为更新后的数据行快照的创建版本号。可以理解为先执行 DELETE 后执行 INSERT。

一致性锁定读

InnoDB存储引擎对于select支持两种一致性的锁定读(locking read)操作:

  • SELECT — FOR UPDATE
  • SELECT — LOCK IN SHARE MODE

SELECT-FOR UPDATE对读取的行记录加一个X锁,其他事务不能对已锁定的行加上任何锁。SELECT-LOCK IN SHARE MODE对读取的行记录加一个S锁,其他事务可以向被锁定的行加S锁,但是如果加X锁,则会被阻塞。

锁的算法

行锁的三种算法

InnoDB存储引擎有3种行锁的算法,其分别是:

  • Record Lock:单个行记录上的锁
  • Gap Lock:间隙锁,锁定一个范围,但不包含记录本身
  • Next-Key Lock : Gap Lock+Record Lock,锁定一个范围,并且锁定记录本身

Record Lock总是会去锁住索引记录,如果InnoDB存储引擎表在建立的时候没有设置任何一个索引,那么这时InnoDB存储引擎会使用隐式的主键来进行锁定。Next-Key Lock 是结合了 Gap Lock 和 Record Lock 的一种锁定算法,在 Next-Key Lock算法下,InnoDB对于行的査询都是采用这种锁定算法。

例如一个索引有10, 11,13和20这四个值,那么该索引可能被Next-Key Locking的区间为:

(-00 ,10]
(10,11]
(11, 13]
(13, 20]
(20,+ ~)

当査询的索引含有唯一属性时,InnoDB存储引擎会对Next-Key Lock进行优化,将其降级为Record Lock,即仅锁住索引本身,而不是范围。(仅适用于查询的列是唯一索引的情况)

若唯一索引由多个列组成,而査询仅是査找多个唯一索引列中的其中一个,那么査询其实是range类型查询,而不是point类型查询,故InnoDB存储引擎依然使用Next-Key Lock进行锁定。

幻读问题

Phantom Problem是指在同一事务下,连续执行两次同样的SQL语句可能导致不同的结果,第二次的SQL语句可能会返回之前不存在的行。

InnoDB存储引擎默认的事务隔离级别是REPEATABLE READ,在该隔离级别下,其采用Next-Key Locking的方式来加锁。而在事务隔离级别READ COMMITTED下,其仅采用Record Lock。

事务并发一致性问题

丢失修改

T1 和 T2 两个事务都对一个数据进行修改,T1 先修改,T2 随后修改,T2 的修改覆盖了 T1 的修改。

读脏数据

T1 修改一个数据,T2 随后读取这个数据。如果 T1 撤销了这次修改,那么 T2 读取的数据是脏数据。

不可重复读

T2 读取一个数据,T1 对该数据做了修改。如果 T2 再次读取这个数据,此时读取的结果和第一次读取的结果不同。

幻影读

T1 读取某个范围的数据,T2 在这个范围内插入新的数据,T1 再次读取这个范围的数据,此时读取的结果和和第一次读取的结果不同。

产生并发不一致性问题主要原因是破坏了事务的隔离性,解决方法是通过并发控制来保证隔离性。并发控制可以通过封锁来实现,但是封锁操作需要用户自己控制,相当复杂。数据库管理系统提供了事务的隔离级别,让用户以一种更轻松的方式处理并发一致性问题。

事务隔离级别

  • 未提交读(READ UNCOMMITTED) —事务中的修改,即使没有提交,对其它事务也是可见的。

  • 提交读(READ COMMITTED) — 一个事务只能读取已经提交的事务所做的修改。换句话说,一个事务所做的修改在提交之前对其它事务是不可见的。

  • 可重复读(REPEATABLE READ)—保证在同一个事务中多次读取同样数据的结果是一样的。

  • 可串行化(SERIALIZABLE)–强制事务串行执行。

隔离级别 脏读 不可重复读 幻影读
未提交读
提交读 ×
可重复读 × ×
可串行化 × × ×

阻塞

因为不同锁之间的兼容性关系,在有些时刻一个事务中的锁需要等待另一个事务中的锁释放它所占用的资源,这就是阻塞。阻塞并不是一件坏事,其是为了确保事务可以并发且正常地运行。

在InnoDB存储引擎中,参数innodb_lock_wait_timeout用来控制等待的时间(默认是50秒),innodb_rollback_on_timeout用来设定是否在等待超时时对进行中的事务进行回滚操作(默认是OFF,代表不回滚)。

在默认情况下InnoDB存储引擎不会回滚超时引发的错误异常。其实InnoDB存储引擎在大部分情况下都不会对异常进行回滚。

锁升级

锁升级(Lock Escalation)是指将当前锁的粒度降低。举例来说,数据库可以把一个表的1000个行锁升级为一个页锁,或者将页锁升级为表锁。如果在数据库的设计中认为锁是一种稀有资源,而且想避免锁的开销,那数据库中会频繁出现锁升级现象。

InnoDB存储引擎不存在锁升级的问题。因为其不是根据每个记录来产生行锁的,相
反,其根据每个事务访问的每个页对锁进行管理的,采用的是位图的方式。因此不管一
个事务锁住页中一个记录还是多个记录,其开销通常都是一致的。

MySQL技术内幕(InnoDB存储引擎)

CS-Note

Paxos算法

Paxos算法核心是一个一致性算法。
在该一致性算法中,有三种参与角色,Proposer,Acceptor和Learner。

选定提案算法流程:

阶段一

  1. Proposer选择一个提案编号K,然后想Acceptor的某个超过半数的子集成员发送编号为K的Prepare请求。
  2. 如果一个Acceptor收到一个编号为K的Prepare请求,如果K大于该Acceptor已经响应的所有Prepare请求的编号,那么它就会将已经批准过的最大编号提案作为响应反馈给Proposer;如果该Acceptor之前未批准过提案,那么直接返回空响应。该Acceptor承诺不会再批准小于K的提案,设置接收的提案值为K。

阶段二:

  1. 如果Proposer收到半数以上的Acceptor对于其发出的编号为K的Prepare请求响应,那么它就会发送一个针对[K,V]提案的Accept请求给Acceptor。(V为返回的所有响应中编号最大提案的值)
  2. 如果Acceptor收到这个[K,V]提案的Accept请求,只要该Acceptor尚未对编号大于K的Prepare请求作出响应,它就可以通过提案。

提案的获取

方案一

一旦Acceptor批准了一个提案,就将该提案发送给所有Learner。
需要让每个Acceptor与所有Learner逐个进行通信,通信次数至少为二者乘积。

方案二

所有的Acceptor将提案批准情况统一发送给一个特定的Learner,它来负责通知其他的Learner。
问题:主Learner随时可能出现故障

方案三

Acceptor将批准的天发送给特定的Learner集合

实例

Prepare 阶段

下图演示了两个 Proposer 和三个 Acceptor 的系统中运行该算法的初始过程,每个 Proposer 都会向所有 Acceptor 发送 Prepare 请求。

当 Acceptor 接收到一个 Prepare 请求,包含的提议为 [n1, v1],并且之前还未接收过 Prepare 请求,那么发送一个 Prepare 响应,设置当前接收到的提议为 [n1, v1],并且保证以后不会再接受序号小于 n1 的提议。

如下图,Acceptor X 在收到 [n=2, v=8] 的 Prepare 请求时,由于之前没有接收过提议,因此就发送一个 [no previous] 的 Prepare 响应,设置当前接收到的提议为 [n=2, v=8],并且保证以后不会再接受序号小于 2 的提议。其它的 Acceptor 类似。

如果 Acceptor 接收到一个 Prepare 请求,包含的提议为 [n2, v2],并且之前已经接收过提议 [n1, v1]。如果 n1 > n2,那么就丢弃该提议请求;否则,发送 Prepare 响应,该 Prepare 响应包含之前已经接收过的提议 [n1, v1],设置当前接收到的提议为 [n2, v2],并且保证以后不会再接受序号小于 n2 的提议。

如下图,Acceptor Z 收到 Proposer A 发来的 [n=2, v=8] 的 Prepare 请求,由于之前已经接收过 [n=4, v=5] 的提议,并且 n > 2,因此就抛弃该提议请求;Acceptor X 收到 Proposer B 发来的 [n=4, v=5] 的 Prepare 请求,因为之前接收到的提议为 [n=2, v=8],并且 2 <= 4,因此就发送 [n=2, v=8] 的 Prepare 响应,设置当前接收到的提议为 [n=4, v=5],并且保证以后不会再接受序号小于 4 的提议。Acceptor Y 类似。

Accept 阶段

当一个 Proposer 接收到超过一半 Acceptor 的 Prepare 响应时,就可以发送 Accept 请求。

Proposer A 接收到两个 Prepare 响应之后,就发送 [n=2, v=8] Accept 请求。该 Accept 请求会被所有 Acceptor 丢弃,因为此时所有 Acceptor 都保证不接受序号小于 4 的提议。

Proposer B 过后也收到了两个 Prepare 响应,因此也开始发送 Accept 请求。需要注意的是,Accept 请求的 v 需要取它收到的最大提议编号对应的 v 值,也就是 8。因此它发送 [n=4, v=8] 的 Accept 请求。

Learn 阶段

Acceptor 接收到 Accept 请求时,如果序号大于等于该 Acceptor 承诺的最小序号,那么就发送 Learn 提议给所有的 Learner。当 Learner 发现有大多数的 Acceptor 接收了某个提议,那么该提议的提议值就被 Paxos 选择出来。

Raft算法

https://github.com/CyC2018/CS-Notes/blob/master/notes/%E5%88%86%E5%B8%83%E5%BC%8F.md#%E4%BA%94paxos

ElasticSearch基本概念

  • 节点(node): 一个节点是一个逻辑上独立的服务,可以存储数据,并参与集群的索引和搜索功能, 一个节点也有唯一的名字,群集通过节点名称进行管理和通信.

  • 索引(Index) : 索引与关系型数据库实例(Database)相当。索引只是一个 逻辑命名空间,它指向一个或多个分片(shards),内部用Apache Lucene实现索引中数据的读写

  • 文档类型(Type):相当于数据库中的table概念。每个文档在ElasticSearch中都必须设定它的类型。文档类型使得同一个索引中在存储结构不同文档时,只需要依据文档类型就可以找到对应的参数映射(Mapping)信息,方便文档的存取

  • 文档(Document) :相当于数据库中的row, 是可以被索引的基本单位。文档是以JSON格式存储的。在一个索引中,您可以存储多个的文档。请注意,虽然在一个索引中有多分文档,但这些文档的结构是一致的,并在第一次存储的时候指定, 文档属于一种 类型(type),各种各样的类型存在于一个索引中。

  • 集群(Cluster): 包含一个或多个具有相同 cluster.name 的节点.

    1. 集群内节点协同工作,共享数据,并共同分担工作负荷。
      
    2. 由于节点是从属集群的,集群会自我重组来均匀地分发数据. 
      
    3. cluster Name是很重要的,因为每个节点只能是群集的一部分,当该节点被设置为相同的名称时,就会自动加入群集。
      
    4. 集群中通过选举产生一个mater节点,它将负责管理集群范畴的变更,例如创建或删除索引,添加节点到集群或从集群删除节点。master 节点无需参与文档层面的变更和搜索,这意味着仅有一个 master 节点并不会因流量增长而成为瓶颈。任意一个节点都可以成为 master 节点。我们例举的集群只有一个节点,因此它会扮演 master 节点的角色。
      
    5. 作为用户,我们可以访问包括 master 节点在内的集群中的任一节点。每个节点都知道各个文档的位置,并能够将我们的请求直接转发到拥有我们想要的数据的节点。无论我们访问的是哪个节点,它都会控制从拥有数据的节点收集响应的过程,并返回给客户端最终的结果。这一切都是由 Elasticsearch 透明管理的
      
  • 分片(shard) :是 工作单元(worker unit) 底层的一员,用来分配集群中的数据,它只负责保存索引中所有数据的一小片。

    1. 分片是一个独立的Lucene实例,并且它自身也是一个完整的搜索引擎。
    2. 文档存储并且被索引在分片中,但是我们的程序并不会直接与它们通信。取而代之,它们直接与索引进行通信的
      
    3. 把分片想象成一个数据的容器。数据被存储在分片中,然后分片又被分配在集群的节点上。当你的集群扩展或者缩小时,elasticsearch 会自动的在节点之间迁移分配分片,以便集群保持均衡
      
    4. 分片分为 主分片(primary shard) 以及 从分片(replica shard) 两种。在你的索引中,每一个文档都属于一个主分片
      
    5. 从分片只是主分片的一个副本,它用于提供数据的冗余副本,在硬件故障时提供数据保护,同时服务于搜索和检索这种只读请求
      
    6. 索引中的主分片的数量在索引创建后就固定下来了,但是从分片的数量可以随时改变。
      
    7. 一个索引默认设置了5个主分片,每个主分片有一个从分片对应
      
  • 副本(Replica):同一个分片(Shard)的备份数据,一个分片可能会有0个或多个副本,这些副本中的数据保证强一致或最终一致。

Elasticsearch集群

Elasticsearch集群搜索

(全文转载自 Elasticsearch内核解析 - 查询篇

目前的Elasticsearch有两个明显的身份,一个是分布式搜索系统,另一个是分布式NoSQL数据库,对于这两种不同的身份,读写语义基本类似,但也有一点差异。

读操作

实时性对于搜索而言是近实时的,延迟在100ms以上,对于NoSQL则需要是实时的。

一致性指的是写入成功后,下次读操作一定要能读取到最新的数据。对于搜索,这个要求会低一些,可以有一些延迟。但是对于NoSQL数据库,则一般要求最好是强一致性的。

结果匹配上,NoSQL作为数据库,查询过程中只有符合不符合两种情况,而搜索里面还有是否相关,类似于NoSQL的结果只能是0或1,而搜索里面可能会有0.1,0.5,0.9等部分匹配或者更相关的情况。

结果召回上,搜索一般只需要召回最满足条件的Top N结果即可,而NoSQL一般都需要返回满足条件的所有结果。

搜索系统一般都是两阶段查询,第一个阶段查询到对应的Doc ID,也就是PK;第二阶段再通过Doc ID去查询完整文档,而NoSQL数据库一般是一阶段就返回结果。在Elasticsearch中两种都支持。

目前NoSQL的查询,聚合、分析和统计等功能上都是要比搜索弱的。

Lucene的读

Elasticsearch使用了Lucene作为搜索引擎库,通过Lucene完成特定字段的搜索等功能,在Lucene中这个功能是通过IndexSearcher的下列接口实现的:

public TopDocs search(Query query, int n);
public Document doc(int docID);
public int count(Query query);
......(其他)

第一个search接口实现搜索功能,返回最满足Query的N个结果;第二个doc接口通过doc id查询Doc内容;第三个count接口通过Query获取到命中数。

这三个功能是搜索中的最基本的三个功能点,对于大部分Elasticsearch中的查询都是比较复杂的,直接用这个接口是无法满足需求的,比如分布式问题。这些问题都留给了Elasticsearch解决,我们接下来看Elasticsearch中相关读功能的剖析。

Elasticsearch的读

Elasticsearch中每个Shard都会有多个Replica,主要是为了保证数据可靠性,除此之外,还可以增加读能力,因为写的时候虽然要写大部分Replica Shard,但是查询的时候只需要查询Primary和Replica中的任何一个就可以了。

在上图中,该Shard有1个Primary和2个Replica Node,当查询的时候,从三个节点中根据Request中的preference参数选择一个节点查询。preference可以设置_local,_primary,_replica以及其他选项。如果选择了primary,则每次查询都是直接查询Primary,可以保证每次查询都是最新的。如果设置了其他参数,那么可能会查询到R1或者R2,这时候就有可能查询不到最新的数据。

接下来看一下,Elasticsearch中的查询是如何支持分布式的。

Elasticsearch中通过分区实现分布式,数据写入的时候根据_routing规则将数据写入某一个Shard中,这样就能将海量数据分布在多个Shard以及多台机器上,已达到分布式的目标。这样就导致了查询的时候,潜在数据会在当前index的所有的Shard中,所以Elasticsearch查询的时候需要查询所有Shard,同一个Shard的Primary和Replica选择一个即可,查询请求会分发给所有Shard,每个Shard中都是一个独立的查询引擎,比如需要返回Top 10的结果,那么每个Shard都会查询并且返回Top 10的结果,然后在Client Node里面会接收所有Shard的结果,然后通过优先级队列二次排序,选择出Top 10的结果返回给用户。

这里有一个问题就是请求膨胀,用户的一个搜索请求在Elasticsearch内部会变成Shard个请求,这里有个优化点,虽然是Shard个请求,但是这个Shard个数不一定要是当前Index中的Shard个数,只要是当前查询相关的Shard即可,这个需要基于业务和请求内容优化,通过这种方式可以优化请求膨胀数。

Elasticsearch中的查询主要分为两类,Get请求:通过ID查询特定Doc;Search请求:通过Query查询匹配Doc。

上图中内存中的Segment是指刚Refresh Segment,但是还没持久化到磁盘的新Segment,而非从磁盘加载到内存中的Segment。

对于Search类请求,查询的时候是一起查询内存和磁盘上的Segment,最后将结果合并后返回。这种查询是近实时(Near Real Time)的,主要是由于内存中的Index数据需要一段时间后才会刷新为Segment。

对于Get类请求,查询的时候是先查询内存中的TransLog,如果找到就立即返回,如果没找到再查询磁盘上的TransLog,如果还没有则再去查询磁盘上的Segment。这种查询是实时(Real Time)的。这种查询顺序可以保证查询到的Doc是最新版本的Doc,这个功能也是为了保证NoSQL场景下的实时性要求。

所有的搜索系统一般都是两阶段查询,第一阶段查询到匹配的DocID,第二阶段再查询DocID对应的完整文档,这种在Elasticsearch中称为query_then_fetch,还有一种是一阶段查询的时候就返回完整Doc,在Elasticsearch中称作query_and_fetch,一般第二种适用于只需要查询一个Shard的请求。

除了一阶段,两阶段外,还有一种三阶段查询的情况。搜索里面有一种算分逻辑是根据TF(Term Frequency)和DF(Document Frequency)计算基础分,但是Elasticsearch中查询的时候,是在每个Shard中独立查询的,每个Shard中的TF和DF也是独立的,虽然在写入的时候通过_routing保证Doc分布均匀,但是没法保证TF和DF均匀,那么就有会导致局部的TF和DF不准的情况出现,这个时候基于TF、DF的算分就不准。为了解决这个问题,Elasticsearch中引入了DFS查询,比如DFS_query_then_fetch,会先收集所有Shard中的TF和DF值,然后将这些值带入请求中,再次执行query_then_fetch,这样算分的时候TF和DF就是准确的,类似的有DFS_query_and_fetch。这种查询的优势是算分更加精准,但是效率会变差。另一种选择是用BM25代替TF/DF模型。

在新版本Elasticsearch中,用户没法指定DFS_query_and_fetch和query_and_fetch,这两种只能被Elasticsearch系统改写。

Elasticsearch查询流程

Elasticsearch中的大部分查询,以及核心功能都是Search类型查询,上面我们了解到查询分为一阶段,二阶段和三阶段,这里我们就以最常见的的二阶段查询为例来介绍查询流程。

注册Action

Elasticsearch中,查询和写操作一样都是在ActionModule.java中注册入口处理函数的。

registerHandler.accept(new RestSearchAction(settings, restController));
......
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
......

如果请求是Rest请求,则会在RestSearchAction中解析请求,检查查询类型,不能设置为dfs_query_and_fetch或者query_and_fetch,这两个目前只能用于Elasticsearch中的优化场景,然后将请求发给后面的TransportSearchAction处理。然后构造SearchRequest,将请求发送给TransportSearchAction处理。

如果是第一阶段的Query Phase请求,则会调用SearchService的executeQueryPhase方法。

如果是第二阶段的Fetch Phase请求,则会调用SearchService的executeFetchPhase方法。

Client Node

Client Node 也包括了前面说过的Parse Request,这里就不再赘述了,接下来看一下其他的部分。

  1. Get Remove Cluster Shard

判断是否需要跨集群访问,如果需要,则获取到要访问的Shard列表。

  1. Get Search Shard Iterator

获取当前Cluster中要访问的Shard,和上一步中的Remove Cluster Shard合并,构建出最终要访问的完整Shard列表。

这一步中,会根据Request请求中的参数从Primary Node和多个Replica Node中选择出一个要访问的Shard。

  1. For Every Shard:Perform

遍历每个Shard,对每个Shard执行后面逻辑。

  1. Send Request To Query Shard

将查询阶段请求发送给相应的Shard。

  1. Merge Docs

上一步将请求发送给多个Shard后,这一步就是异步等待返回结果,然后对结果合并。这里的合并策略是维护一个Top N大小的优先级队列,每当收到一个shard的返回,就把结果放入优先级队列做一次排序,直到所有的Shard都返回。

翻页逻辑也是在这里,如果需要取Top 30~ Top 40的结果,这个的意思是所有Shard查询结果中的第30到40的结果,那么在每个Shard中无法确定最终的结果,每个Shard需要返回Top 40的结果给Client Node,然后Client Node中在merge docs的时候,计算出Top 40的结果,最后再去除掉Top 30,剩余的10个结果就是需要的Top 30~ Top 40的结果。

上述翻页逻辑有一个明显的缺点就是每次Shard返回的数据中包括了已经翻过的历史结果,如果翻页很深,则在这里需要排序的Docs会很多,比如Shard有1000,取第9990到10000的结果,那么这次查询,Shard总共需要返回1000 * 10000,也就是一千万Doc,这种情况很容易导致OOM。

另一种翻页方式是使用search_after,这种方式会更轻量级,如果每次只需要返回10条结构,则每个Shard只需要返回search_after之后的10个结果即可,返回的总数据量只是和Shard个数以及本次需要的个数有关,和历史已读取的个数无关。这种方式更安全一些,推荐使用这种。

如果有aggregate,也会在这里做聚合,但是不同的aggregate类型的merge策略不一样,具体的可以在后面的aggregate文章中再介绍。

  1. Send Request To Fetch Shard

选出Top N个Doc ID后发送给这些Doc ID所在的Shard执行Fetch Phase,最后会返回Top N的Doc的内容。

Query Phase

接下来我们看第一阶段查询的步骤:

  1. Create Search Context

创建Search Context,之后Search过程中的所有中间状态都会存在Context中,这些状态总共有50多个,具体可以查看DefaultSearchContext或者其他SearchContext的子类。

  1. Parse Query

解析Query的Source,将结果存入Search Context。这里会根据请求中Query类型的不同创建不同的Query对象,比如TermQuery、FuzzyQuery等,最终真正执行TermQuery、FuzzyQuery等语义的地方是在Lucene中。

这里包括了dfsPhase、queryPhase和fetchPhase三个阶段的preProcess部分,只有queryPhase的preProcess中有执行逻辑,其他两个都是空逻辑,执行完preProcess后,所有需要的参数都会设置完成。

由于Elasticsearch中有些请求之间是相互关联的,并非独立的,比如scroll请求,所以这里同时会设置Context的生命周期。

同时会设置lowLevelCancellation是否打开,这个参数是集群级别配置,同时也能动态开关,打开后会在后面执行时做更多的检测,检测是否需要停止后续逻辑直接返回。

  1. Get From Cache

判断请求是否允许被Cache,如果允许,则检查Cache中是否已经有结果,如果有则直接读取Cache,如果没有则继续执行后续步骤,执行完后,再将结果加入Cache。

  1. Add Collectors

Collector主要目标是收集查询结果,实现排序,对自定义结果集过滤和收集等。这一步会增加多个Collectors,多个Collector组成一个List。

  1. FilteredCollector:先判断请求中是否有Post Filter,Post Filter用于Search,Agg等结束后再次对结果做Filter,希望Filter不影响Agg结果。如果有Post Filter则创建一个FilteredCollector,加入Collector List中。

  2. PluginInMultiCollector:判断请求中是否制定了自定义的一些Collector,如果有,则创建后加入Collector List。

  3. MinimumScoreCollector:判断请求中是否制定了最小分数阈值,如果指定了,则创建MinimumScoreCollector加入Collector List中,在后续收集结果时,会过滤掉得分小于最小分数的Doc。

  4. EarlyTerminatingCollector:判断请求中是否提前结束Doc的Seek,如果是则创建EarlyTerminatingCollector,加入Collector List中。在后续Seek和收集Doc的过程中,当Seek的Doc数达到Early Terminating后会停止Seek后续倒排链。

  5. CancellableCollector:判断当前操作是否可以被中断结束,比如是否已经超时等,如果是会抛出一个TaskCancelledException异常。该功能一般用来提前结束较长的查询请求,可以用来保护系统。

  6. EarlyTerminatingSortingCollector:如果Index是排序的,那么可以提前结束对倒排链的Seek,相当于在一个排序递减链表上返回最大的N个值,只需要直接返回前N个值就可以了。这个Collector会加到Collector List的头部。EarlyTerminatingSorting和EarlyTerminating的区别是,EarlyTerminatingSorting是一种对结果无损伤的优化,而EarlyTerminating是有损的,人为掐断执行的优化。

  7. TopDocsCollector:这个是最核心的Top N结果选择器,会加入到Collector List的头部。TopScoreDocCollector和TopFieldCollector都是TopDocsCollector的子类,TopScoreDocCollector会按照固定的方式算分,排序会按照分数+doc id的方式排列,如果多个doc的分数一样,先选择doc id小的文档。而TopFieldCollector则是根据用户指定的Field的值排序。

  8. lucene::search

这一步会调用Lucene中IndexSearch的search接口,执行真正的搜索逻辑。每个Shard中会有多个Segment,每个Segment对应一个LeafReaderContext,这里会遍历每个Segment,到每个Segment中去Search结果,然后计算分数。

搜索里面一般有两阶段算分,第一阶段是在这里算的,会对每个Seek到的Doc都计算分数,为了减少CPU消耗,一般是算一个基本分数。这一阶段完成后,会有个排序。然后在第二阶段,再对Top 的结果做一次二阶段算分,在二阶段算分的时候会考虑更多的因子。二阶段算分在后续操作中。

具体请求,比如TermQuery、WildcardQuery的查询逻辑都在Lucene中,后面会有专门文章介绍。

  1. rescore

根据Request中是否包含rescore配置决定是否进行二阶段排序,如果有则执行二阶段算分逻辑,会考虑更多的算分因子。二阶段算分也是一种计算机中常见的多层设计,是一种资源消耗和效率的折中。

Elasticsearch中支持配置多个Rescore,这些rescore逻辑会顺序遍历执行。每个rescore内部会先按照请求参数window选择出Top window的doc,然后对这些doc排序,排完后再合并回原有的Top 结果顺序中。

  1. suggest::execute()

如果有推荐请求,则在这里执行推荐请求。如果请求中只包含了推荐的部分,则很多地方可以优化。推荐不是今天的重点,这里就不介绍了,后面有机会再介绍。

  1. aggregation::execute()

如果含有聚合统计请求,则在这里执行。Elasticsearch中的aggregate的处理逻辑也类似于Search,通过多个Collector来实现。在Client Node中也需要对aggregation做合并。aggregate逻辑更复杂一些,就不在这里赘述了,后面有需要就再单独开文章介绍。

上述逻辑都执行完成后,如果当前查询请求只需要查询一个Shard,那么会直接在当前Node执行Fetch Phase。

Fetch Phase

Elasticsearch作为搜索系统时,或者任何搜索系统中,除了Query阶段外,还会有一个Fetch阶段,这个Fetch阶段在数据库类系统中是没有的,是搜索系统中额外增加的阶段。搜索系统中额外增加Fetch阶段的原因是搜索系统中数据分布导致的,在搜索中,数据通过routing分Shard的时候,只能根据一个主字段值来决定,但是查询的时候可能会根据其他非主字段查询,那么这个时候所有Shard中都可能会存在相同非主字段值的Doc,所以需要查询所有Shard才能不会出现结果遗漏。同时如果查询主字段,那么这个时候就能直接定位到Shard,就只需要查询特定Shard即可,这个时候就类似于数据库系统了。另外,数据库中的二级索引又是另外一种情况,但类似于查主字段的情况,这里就不多说了。

基于上述原因,第一阶段查询的时候并不知道最终结果会在哪个Shard上,所以每个Shard中管都需要查询完整结果,比如需要Top 10,那么每个Shard都需要查询当前Shard的所有数据,找出当前Shard的Top 10,然后返回给Client Node。如果有100个Shard,那么就需要返回100 * 10 = 1000个结果,而Fetch Doc内容的操作比较耗费IO和CPU,如果在第一阶段就Fetch Doc,那么这个资源开销就会非常大。所以,一般是当Client Node选择出最终Top N的结果后,再对最终的Top N读取Doc内容。通过增加一点网络开销而避免大量IO和CPU操作,这个折中是非常划算的。

Fetch阶段的目的是通过DocID获取到用户需要的完整Doc内容。这些内容包括了DocValues,Store,Source,Script和Highlight等,具体的功能点是在SearchModule中注册的,系统默认注册的有:

  • ExplainFetchSubPhase
  • DocValueFieldsFetchSubPhase
  • ScriptFieldsFetchSubPhase
  • FetchSourceSubPhase
  • VersionFetchSubPhase
  • MatchedQueriesFetchSubPhase
  • HighlightPhase
  • ParentFieldSubFetchPhase

除了系统默认的8种外,还有通过插件的形式注册自定义的功能,这些SubPhase中最重要的是Source和Highlight,Source是加载原文,Highlight是计算高亮显示的内容片断。

上述多个SubPhase会针对每个Doc顺序执行,可能会产生多次的随机IO,这里会有一些优化方案,但是都是针对特定场景的,不具有通用性。

Fetch Phase执行完后,整个查询流程就结束了。

Elasticsearch集群写入

(全文转载自 Elasticsearch内核解析 - 写入篇

写操作

  • 实时性:
    • 搜索系统的Index一般都是NRT(Near Real Time),近实时的,比如Elasticsearch中,Index的实时性是由refresh控制的,默认是1s,最快可到100ms,那么也就意味着Index doc成功后,需要等待一秒钟后才可以被搜索到。
    • NoSQL数据库的Write基本都是RT(Real Time),实时的,写入成功后,立即是可见的。Elasticsearch中的Index请求也能保证是实时的,因为Get请求会直接读内存中尚未Flush到存储介质的TransLog。
  • 可靠性:
    • 搜索系统对可靠性要求都不高,一般数据的可靠性通过将原始数据存储在另一个存储系统来保证,当搜索系统的数据发生丢失时,再从其他存储系统导一份数据过来重新rebuild就可以了。在Elasticsearch中,通过设置TransLog的Flush频率可以控制可靠性,要么是按请求,每次请求都Flush;要么是按时间,每隔一段时间Flush一次。一般为了性能考虑,会设置为每隔5秒或者1分钟Flush一次,Flush间隔时间越长,可靠性就会越低。
    • NoSQL数据库作为一款数据库,必须要有很高的可靠性,数据可靠性是生命底线,决不能有闪失。如果把Elasticsearch当做NoSQL数据库,此时需要设置TransLog的Flush策略为每个请求都要Flush,这样才能保证当前Shard写入成功后,数据能尽量持久化下来。

写操作的关键点

在考虑或分析一个分布式系统的写操作时,一般需要从下面几个方面考虑:

  • 可靠性:或者是持久性,数据写入系统成功后,数据不会被回滚或丢失。
  • 一致性:数据写入成功后,再次查询时必须能保证读取到最新版本的数据,不能读取到旧数据。
  • 原子性:一个写入或者更新操作,要么完全成功,要么完全失败,不允许出现中间状态。
  • 隔离性:多个写入操作相互不影响。
  • 实时性:写入后是否可以立即被查询到。
  • 性能:写入性能,吞吐量到底怎么样。

Elasticsearch作为分布式系统,也需要在写入的时候满足上述的四个特点,我们在后面的写流程介绍中会涉及到上述四个方面。

接下来,我们一层一层剖析Elasticsearch内部的写机制。

Lucene的写

众所周知,Elasticsearch内部使用了Lucene完成索引创建和搜索功能,Lucene中写操作主要是通过IndexWriter类实现,IndexWriter提供三个接口:

 public long addDocument();
 public long updateDocuments();
 public long deleteDocuments();

通过这三个接口可以完成单个文档的写入,更新和删除功能,包括了分词,倒排创建,正排创建等等所有搜索相关的流程。只要Doc通过IndesWriter写入后,后面就可以通过IndexSearcher搜索了,看起来功能已经完善了,但是仍然有一些问题没有解:

  1. 上述操作是单机的,而不是我们需要的分布式。
  2. 文档写入Lucene后并不是立即可查询的,需要生成完整的Segment后才可被搜索,如何保证实时性?
  3. Lucene生成的Segment是在内存中,如果机器宕机或掉电后,内存中的Segment会丢失,如何保证数据可靠性 ?
  4. Lucene不支持部分文档更新,但是这又是一个强需求,如何支持部分更新?

上述问题,在Lucene中是没有解决的,那么就需要Elasticsearch中解决上述问题。

Elasticsearch的写

Elasticsearch采用多Shard方式,通过配置routing规则将数据分成多个数据子集,每个数据子集提供独立的索引和搜索功能。当写入文档的时候,根据routing规则,将文档发送给特定Shard中建立索引。这样就能实现分布式了。

此外,Elasticsearch整体架构上采用了一主多副的方式:

每个Index由多个Shard组成,每个Shard有一个主节点和多个副本节点,副本个数可配。但每次写入的时候,写入请求会先根据_routing规则选择发给哪个Shard,Index Request中可以设置使用哪个Filed的值作为路由参数,如果没有设置,则使用Mapping中的配置,如果mapping中也没有配置,则使用_id作为路由参数,然后通过_routing的Hash值选择出Shard(在OperationRouting类中),最后从集群的Meta中找出出该Shard的Primary节点。

请求接着会发送给Primary Shard,在Primary Shard上执行成功后,再从Primary Shard上将请求同时发送给多个Replica Shard,请求在多个Replica Shard上执行成功并返回给Primary Shard后,写入请求执行成功,返回结果给客户端。

这种模式下,写入操作的延时就等于latency = Latency(Primary Write) + Max(Replicas Write)。只要有副本在,写入延时最小也是两次单Shard的写入时延总和,写入效率会较低,但是这样的好处也很明显,避免写入后,单机或磁盘故障导致数据丢失,在数据重要性和性能方面,一般都是优先选择数据,除非一些允许丢数据的特殊场景。

采用多个副本后,避免了单机或磁盘故障发生时,对已经持久化后的数据造成损害,但是Elasticsearch里为了减少磁盘IO保证读写性能,一般是每隔一段时间(比如5分钟)才会把Lucene的Segment写入磁盘持久化,对于写入内存,但还未Flush到磁盘的Lucene数据,如果发生机器宕机或者掉电,那么内存中的数据也会丢失,这时候如何保证?

对于这种问题,Elasticsearch学习了数据库中的处理方式:增加CommitLog模块,Elasticsearch中叫TransLog。

在每一个Shard中,写入流程分为两部分,先写入Lucene,再写入TransLog。

写入请求到达Shard后,先写Lucene文件,创建好索引,此时索引还在内存里面,接着去写TransLog,写完TransLog后,刷新TransLog数据到磁盘上,写磁盘成功后,请求返回给用户。这里有几个关键点,一是和数据库不同,数据库是先写CommitLog,然后再写内存,而Elasticsearch是先写内存,最后才写TransLog,一种可能的原因是Lucene的内存写入会有很复杂的逻辑,很容易失败,比如分词,字段长度超过限制等,比较重,为了避免TransLog中有大量无效记录,减少recover的复杂度和提高速度,所以就把写Lucene放在了最前面。二是写Lucene内存后,并不是可被搜索的,需要通过Refresh把内存的对象转成完整的Segment后,然后再次reopen后才能被搜索,一般这个时间设置为1秒钟,导致写入Elasticsearch的文档,最快要1秒钟才可被从搜索到,所以Elasticsearch在搜索方面是NRT(Near Real Time)近实时的系统。三是当Elasticsearch作为NoSQL数据库时,查询方式是GetById,这种查询可以直接从TransLog中查询,这时候就成了RT(Real Time)实时系统。四是每隔一段比较长的时间,比如30分钟后,Lucene会把内存中生成的新Segment刷新到磁盘上,刷新后索引文件已经持久化了,历史的TransLog就没用了,会清空掉旧的TransLog。

上面介绍了Elasticsearch在写入时的两个关键模块,Replica和TransLog,接下来,我们看一下Update流程:

Lucene中不支持部分字段的Update,所以需要在Elasticsearch中实现该功能,具体流程如下:

  1. 收到Update请求后,从Segment或者TransLog中读取同id的完整Doc,记录版本号为V1。
  2. 将版本V1的全量Doc和请求中的部分字段Doc合并为一个完整的Doc,同时更新内存中的VersionMap。获取到完整Doc后,Update请求就变成了Index请求。
  3. 加锁。
  4. 再次从versionMap中读取该id的最大版本号V2,如果versionMap中没有,则从Segment或者TransLog中读取,这里基本都会从versionMap中获取到。
  5. 检查版本是否冲突(V1==V2),如果冲突,则回退到开始的“Update doc”阶段,重新执行。如果不冲突,则执行最新的Add请求。
  6. 在Index Doc阶段,首先将Version + 1得到V3,再将Doc加入到Lucene中去,Lucene中会先删同id下的已存在doc id,然后再增加新Doc。写入Lucene成功后,将当前V3更新到versionMap中。
  7. 释放锁,部分更新的流程就结束了。

介绍完部分更新的流程后,大家应该从整体架构上对Elasticsearch的写入有了一个初步的映象,接下来我们详细剖析下写入的详细步骤。

Elasticsearch写入请求类型

Elasticsearch中的写入请求类型,主要包括下列几个:Index(Create),Update,Delete和Bulk,其中前3个是单文档操作,后一个Bulk是多文档操作,其中Bulk中可以包括Index(Create),Update和Delete。

在6.0.0及其之后的版本中,前3个单文档操作的实现基本都和Bulk操作一致,甚至有些就是通过调用Bulk的接口实现的。估计接下来几个版本后,Index(Create),Update,Delete都会被当做Bulk的一种特例化操作被处理。这样,代码和逻辑都会更清晰一些。

下面,我们就以Bulk请求为例来介绍写入流程。

Elasticsearch写入流程图

  • 红色:Client Node。
  • 绿色:Primary Node。
  • 蓝色:Replica Node。

注册Action

在Elasticsearch中,所有action的入口处理方法都是注册在ActionModule.java中,比如Bulk Request有两个注册入口,分别是Rest和Transport入口.

如果请求是Rest请求,则会在RestBulkAction中Parse Request,构造出BulkRequest,然后发给后面的TransportAction处理。

TransportShardBulkAction的基类TransportReplicationAction中注册了对Primary,Replica等的不同处理入口:

这里对原始请求,Primary Node请求和Replica Node请求各自注册了一个handler处理入口。

Client Node

Client Node 也包括了前面说过的Parse Request,这里就不再赘述了,接下来看一下其他的部分。

  1. Ingest Pipeline

在这一步可以对原始文档做一些处理,比如HTML解析,自定义的处理,具体处理逻辑可以通过插件来实现。在Elasticsearch中,由于Ingest Pipeline会比较耗费CPU等资源,可以设置专门的Ingest Node,专门用来处理Ingest Pipeline逻辑。

如果当前Node不能执行Ingest Pipeline,则会将请求发给另一台可以执行Ingest Pipeline的Node。

  1. Auto Create Index

判断当前Index是否存在,如果不存在,则需要自动创建Index,这里需要和Master交互。也可以通过配置关闭自动创建Index的功能。

  1. Set Routing

设置路由条件,如果Request中指定了路由条件,则直接使用Request中的Routing,否则使用Mapping中配置的,如果Mapping中无配置,则使用默认的_id字段值。

在这一步中,如果没有指定id字段,则会自动生成一个唯一的_id字段,目前使用的是UUID。

  1. Construct BulkShardRequest

由于Bulk Request中会包括多个(Index/Update/Delete)请求,这些请求根据routing可能会落在多个Shard上执行,这一步会按Shard挑拣Single Write Request,同一个Shard中的请求聚集在一起,构建BulkShardRequest,每个BulkShardRequest对应一个Shard。

  1. Send Request To Primary

这一步会将每一个BulkShardRequest请求发送给相应Shard的Primary Node。

Primary Node

Primary 请求的入口是在PrimaryOperationTransportHandler的messageReceived,我们来看一下相关的逻辑流程。

  1. Index or Update or Delete

循环执行每个Single Write Request,对于每个Request,根据操作类型(CREATE/INDEX/UPDATE/DELETE)选择不同的处理逻辑。

其中,Create/Index是直接新增Doc,Delete是直接根据_id删除Doc,Update会稍微复杂些,我们下面就以Update为例来介绍。

  1. Translate Update To Index or Delete

这一步是Update操作的特有步骤,在这里,会将Update请求转换为Index或者Delete请求。首先,会通过GetRequest查询到已经存在的同_id Doc(如果有)的完整字段和值(依赖_source字段),然后和请求中的Doc合并。同时,这里会获取到读到的Doc版本号,记做V1。

  1. Parse Doc

这里会解析Doc中各个字段。生成ParsedDocument对象,同时会生成uid Term。在Elasticsearch中,_uid = type # _id,对用户,_Id可见,而Elasticsearch中存储的是_uid。这一部分生成的ParsedDocument中也有Elasticsearch的系统字段,大部分会根据当前内容填充,部分未知的会在后面继续填充ParsedDocument。

  1. Update Mapping

Elasticsearch中有个自动更新Mapping的功能,就在这一步生效。会先挑选出Mapping中未包含的新Field,然后判断是否运行自动更新Mapping,如果允许,则更新Mapping。

  1. Get Sequence Id and Version

由于当前是Primary Shard,则会从SequenceNumber Service获取一个sequenceID和Version。SequenceID在Shard级别每次递增1,SequenceID在写入Doc成功后,会用来初始化LocalCheckpoint。Version则是根据当前Doc的最大Version递增1。

  1. Add Doc To Lucene

这一步开始的时候会给特定_uid加锁,然后判断该_uid对应的Version是否等于之前Translate Update To Index步骤里获取到的Version,如果不相等,则说明刚才读取Doc后,该Doc发生了变化,出现了版本冲突,这时候会抛出一个VersionConflict的异常,该异常会在Primary Node最开始处捕获,重新从“Translate Update To Index or Delete”开始执行。

如果Version相等,则继续执行,如果已经存在同id的Doc,则会调用Lucene的UpdateDocument(uid, doc)接口,先根据uid删除Doc,然后再Index新Doc。如果是首次写入,则直接调用Lucene的AddDocument接口完成Doc的Index,AddDocument也是通过UpdateDocument实现。

这一步中有个问题是,如何保证Delete-Then-Add的原子性,怎么避免中间状态时被Refresh?答案是在开始Delete之前,会加一个Refresh Lock,禁止被Refresh,只有等Add完后释放了Refresh Lock后才能被Refresh,这样就保证了Delete-Then-Add的原子性。

Lucene的UpdateDocument接口中就只是处理多个Field,会遍历每个Field逐个处理,处理顺序是invert index,store field,doc values,point dimension,后续会有文章专门介绍Lucene中的写入。

  1. Write Translog

写完Lucene的Segment后,会以keyvalue的形式写TransLog,Key是_id,Value是Doc内容。当查询的时候,如果请求是GetDocByID,则可以直接根据_id从TransLog中读取到,满足NoSQL场景下的实时性要去。

需要注意的是,这里只是写入到内存的TransLog,是否Sync到磁盘的逻辑还在后面。

这一步的最后,会标记当前SequenceID已经成功执行,接着会更新当前Shard的LocalCheckPoint。

  1. Renew Bulk Request

这里会重新构造Bulk Request,原因是前面已经将UpdateRequest翻译成了Index或Delete请求,则后续所有Replica中只需要执行Index或Delete请求就可以了,不需要再执行Update逻辑,一是保证Replica中逻辑更简单,性能更好,二是保证同一个请求在Primary和Replica中的执行结果一样。

  1. Flush Translog

这里会根据TransLog的策略,选择不同的执行方式,要么是立即Flush到磁盘,要么是等到以后再Flush。Flush的频率越高,可靠性越高,对写入性能影响越大。

  1. Send Requests To Replicas

这里会将刚才构造的新的Bulk Request并行发送给多个Replica,然后等待Replica的返回,这里需要等待所有Replica返回后(可能有成功,也有可能失败),Primary Node才会返回用户。如果某个Replica失败了,则Primary会给Master发送一个Remove Shard请求,要求Master将该Replica Shard从可用节点中移除。

这里,同时会将SequenceID,PrimaryTerm,GlobalCheckPoint等传递给Replica。

发送给Replica的请求中,Action Name等于原始ActionName + [R],这里的R表示Replica。通过这个[R]的不同,可以找到处理Replica请求的Handler。

  1. Receive Response From Replicas

Replica中请求都处理完后,会更新Primary Node的LocalCheckPoint。

Replica Node

Replica 请求的入口是在ReplicaOperationTransportHandler的messageReceived,我们来看一下相关的逻辑流程。

  1. Index or Delete

根据请求类型是Index还是Delete,选择不同的执行逻辑。这里没有Update,是因为在Primary Node中已经将Update转换成了Index或Delete请求了。

  1. Parse Doc

  2. Update Mapping

以上都和Primary Node中逻辑一致。

  1. Get Sequence Id and Version

Primary Node中会生成Sequence ID和Version,然后放入ReplicaRequest中,这里只需要从Request中获取到就行。

  1. Add Doc To Lucene

由于已经在Primary Node中将部分Update请求转换成了Index或Delete请求,这里只需要处理Index和Delete两种请求,不再需要处理Update请求了。比Primary Node会更简单一些。

  1. Write Translog

  2. Flush Translog

以上都和Primary Node中逻辑一致。

ES写入总结

上面详细介绍了Elasticsearch的写入流程及其各个流程的工作机制,我们在这里再次总结下之前提出的分布式系统中的六大特性:

可靠性:由于Lucene的设计中不考虑可靠性,在Elasticsearch中通过Replica和TransLog两套机制保证数据的可靠性。
一致性:Lucene中的Flush锁只保证Update接口里面Delete和Add中间不会Flush,但是Add完成后仍然有可能立即发生Flush,导致Segment可读。这样就没法保证Primary和所有其他Replica可以同一时间Flush,就会出现查询不稳定的情况,这里只能实现最终一致性。
原子性:Add和Delete都是直接调用Lucene的接口,是原子的。当部分更新时,使用Version和锁保证更新是原子的。
隔离性:仍然采用Version和局部锁来保证更新的是特定版本的数据。
实时性:使用定期Refresh Segment到内存,并且Reopen Segment方式保证搜索可以在较短时间(比如1秒)内被搜索到。通过将未刷新到磁盘数据记入TransLog,保证对未提交数据可以通过ID实时访问到。
性能:性能是一个系统性工程,所有环节都要考虑对性能的影响,在Elasticsearch中,在很多地方的设计都考虑到了性能,一是不需要所有Replica都返回后才能返回给用户,只需要返回特定数目的就行;二是生成的Segment现在内存中提供服务,等一段时间后才刷新到磁盘,Segment在内存这段时间的可靠性由TransLog保证;三是TransLog可以配置为周期性的Flush,但这个会给可靠性带来伤害;四是每个线程持有一个Segment,多线程时相互不影响,相互独立,性能更好;五是系统的写入流程对版本依赖较重,读取频率较高,因此采用了versionMap,减少热点数据的多次磁盘IO开销。Lucene中针对性能做了大量的优化。后面我们也会有文章专门介绍Lucene中的优化思路。

Elasticsearch分布式一致性原理剖析-节点

(全文转载自https://zhuanlan.zhihu.com/p/34858035)

Elasticsearch分布式一致性原理剖析-节点目录

Elasticsearch分布式一致性原理剖析”系列将会对Elasticsearch的分布式一致性原理进行详细的剖析,介绍其实现方式、原理以及其存在的问题等(基于6.2版本)。

ES目前是最流行的分布式搜索引擎系统,其使用Lucene作为单机存储引擎并提供强大的搜索查询能力。学习其搜索原理,则必须了解Lucene,而学习ES的架构,就必须了解其分布式如何实现,而一致性是分布式系统的核心之一。

本篇将介绍ES的集群组成、节点发现与Master选举,错误检测与扩缩容相关的内容。ES在处理节点发现与Master选举等方面没有选择Zookeeper等外部组件,而是自己实现的一套,本文会介绍ES的这套机制是如何工作的,存在什么问题。本文的主要内容如下:

  1. ES集群构成
  2. 节点发现
  3. Master选举
  4. 错误检测
  5. 集群扩缩容
  6. 与Zookeeper、raft等实现方式的比较

ES集群构成

首先,一个Elasticsearch集群(下面简称ES集群)是由许多节点(Node)构成的,Node可以有不同的类型,通过以下配置,可以产生四种不同类型的Node:

conf/elasticsearch.yml:
    node.master: true/false
    node.data: true/false

四种不同类型的Node是一个node.master和node.data的true/false的两两组合。当然还有其他类型的Node,比如IngestNode(用于数据预处理等),不在本文讨论范围内。

当node.master为true时,其表示这个node是一个master的候选节点,可以参与选举,在ES的文档中常被称作master-eligible node,类似于MasterCandidate。ES正常运行时只能有一个master(即leader),多于1个时会发生脑裂。

当node.data为true时,这个节点作为一个数据节点,会存储分配在该node上的shard的数据并负责这些shard的写入、查询等。

此外,任何一个集群内的node都可以执行任何请求,其会负责将请求转发给对应的node进行处理,所以当node.master和node.data都为false时,这个节点可以作为一个类似proxy的节点,接受请求并进行转发、结果聚合等。

上图是一个ES集群的示意图,其中NodeA是当前集群的Master,NodeB和NodeC是Master的候选节点,其中NodeA和NodeB同时也是数据节点(DataNode),此外,NodeD是一个单纯的数据节点,Node_E是一个proxy节点。每个Node会跟其他所有Node建立连接。

到这里,我们提一个问题,供读者思考:一个ES集群应当配置多少个master-eligible node,当集群的存储或者计算资源不足,需要扩容时,新扩上去的节点应该设置为何种类型?

节点发现

ZenDiscovery是ES自己实现的一套用于节点发现和选主等功能的模块,没有依赖Zookeeper等工具,官方文档:

https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-zen.html

简单来说,节点发现依赖以下配置:

conf/elasticsearch.yml:
    discovery.zen.ping.unicast.hosts: [1.1.1.1, 1.1.1.2, 1.1.1.3]

这个配置可以看作是,在本节点到每个hosts中的节点建立一条边,当整个集群所有的node形成一个联通图时,所有节点都可以知道集群中有哪些节点,不会形成孤岛。

官方推荐这里设置为所有的master-eligible node,读者可以想想这样有何好处:

It is recommended that the unicast hosts list be maintained as the list of master-eligible

Master选举

上面提到,集群中可能会有多个master-eligible node,此时就要进行master选举,保证只有一个当选master。如果有多个node当选为master,则集群会出现脑裂,脑裂会破坏数据的一致性,导致集群行为不可控,产生各种非预期的影响。

为了避免产生脑裂,ES采用了常见的分布式系统思路,保证选举出的master被多数派(quorum)的master-eligible node认可,以此来保证只有一个master。这个quorum通过以下配置进行配置:

conf/elasticsearch.yml:
    discovery.zen.minimum_master_nodes: 2

这个配置对于整个集群非常重要。

master选举谁发起,什么时候发起?

master选举当然是由master-eligible节点发起,当一个master-eligible节点发现满足以下条件时发起选举:

该master-eligible节点的当前状态不是master。
该master-eligible节点通过ZenDiscovery模块的ping操作询问其已知的集群其他节点,没有任何节点连接到master。
包括本节点在内,当前已有超过minimum_master_nodes个节点没有连接到master。
总结一句话,即当一个节点发现包括自己在内的多数派的master-eligible节点认为集群没有master时,就可以发起master选举。

当需要选举master时,选举谁?

首先是选举谁的问题,如下面源码所示,选举的是排序后的第一个MasterCandidate(即master-eligible node)。

public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
        assert hasEnoughCandidates(candidates);
        List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
        sortedCandidates.sort(MasterCandidate::compare);
        return sortedCandidates.get(0);
    }

那么是按照什么排序的?

public static int compare(MasterCandidate c1, MasterCandidate c2) {
    // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
    // list, so if c2 has a higher cluster state version, it needs to come first.
    int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
    if (ret == 0) {
        ret = compareNodes(c1.getNode(), c2.getNode());
    }
    return ret;
}

如上面源码所示,先根据节点的clusterStateVersion比较,clusterStateVersion越大,优先级越高。clusterStateVersion相同时,进入compareNodes,其内部按照节点的Id比较(Id为节点第一次启动时随机生成)。

总结一下:

当clusterStateVersion越大,优先级越高。这是为了保证新Master拥有最新的clusterState(即集群的meta),避免已经commit的meta变更丢失。因为Master当选后,就会以这个版本的clusterState为基础进行更新。(一个例外是集群全部重启,所有节点都没有meta,需要先选出一个master,然后master再通过持久化的数据进行meta恢复,再进行meta同步)。
当clusterStateVersion相同时,节点的Id越小,优先级越高。即总是倾向于选择Id小的Node,这个Id是节点第一次启动时生成的一个随机字符串。之所以这么设计,应该是为了让选举结果尽可能稳定,不要出现都想当master而选不出来的情况。

什么时候选举成功?

当一个master-eligible node(我们假设为Node_A)发起一次选举时,它会按照上述排序策略选出一个它认为的master。

假设Node_A选Node_B当Master:
Node_A会向Node_B发送join请求,那么此时:

(1) 如果Node_B已经成为Master,Node_B就会把Node_A加入到集群中,然后发布最新的cluster_state, 最新的cluster_state就会包含Node_A的信息。相当于一次正常情况的新节点加入。对于Node_A,等新的cluster_state发布到Node_A的时候,Node_A也就完成join了。

(2) 如果Node_B在竞选Master,那么Node_B会把这次join当作一张选票。对于这种情况,Node_A会等待一段时间,看Node_B是否能成为真正的Master,直到超时或者有别的Master选成功。

(3) 如果Node_B认为自己不是Master(现在不是,将来也选不上),那么Node_B会拒绝这次join。对于这种情况,Node_A会开启下一轮选举。

假设Node_A选自己当Master:
此时NodeA会等别的node来join,即等待别的node的选票,当收集到超过半数的选票时,认为自己成为master,然后变更cluster_state中的master node为自己,并向集群发布这一消息。

有兴趣的同学可以看看下面这段源码:

if (transportService.getLocalNode().equals(masterNode)) {
            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
            logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                    new NodeJoinController.ElectionCallback() {
                        @Override
                        public void onElectedAsMaster(ClusterState state) {
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDone(currentThread);
                            }
                        }

                        @Override
                        public void onFailure(Throwable t) {
                            logger.trace("failed while waiting for nodes to join, rejoining", t);
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                            }
                        }
                    }

            );
        } else {
            // process any incoming joins (they will fail because we are not the master)
            nodeJoinController.stopElectionContext(masterNode + " elected");

            // send join request
            final boolean success = joinElectedMaster(masterNode);

            synchronized (stateMutex) {
                if (success) {
                    DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
                    if (currentMasterNode == null) {
                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                        // a valid master.
                        logger.debug("no master node is set, despite of join request completing. retrying pings.");
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    } else if (currentMasterNode.equals(masterNode) == false) {
                        // update cluster state
                        joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
                    }

                    joinThreadControl.markThreadAsDone(currentThread);
                } else {
                    // failed to join. Try again...
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                }
            }
        }

按照上述流程,我们描述一个简单的场景来帮助大家理解:

假如集群中有3个master-eligible node,分别为Node_A、 Node_B、 Node_C, 选举优先级也分别为Node_A、Node_B、Node_C。三个node都认为当前没有master,于是都各自发起选举,选举结果都为Node_A(因为选举时按照优先级排序,如上文所述)。于是Node_A开始等join(选票),Node_B、Node_C都向Node_A发送join,当Node_A接收到一次join时,加上它自己的一票,就获得了两票了(超过半数),于是Node_A成为Master。此时cluster_state(集群状态)中包含两个节点,当Node_A再收到另一个节点的join时,cluster_state包含全部三个节点。

选举怎么保证不脑裂?

基本原则还是多数派的策略,如果必须得到多数派的认可才能成为Master,那么显然不可能有两个Master都得到多数派的认可。

上述流程中,master候选人需要等待多数派节点进行join后才能真正成为master,就是为了保证这个master得到了多数派的认可。但是我这里想说的是,上述流程在绝大部份场景下没问题,听上去也非常合理,但是却是有bug的。

因为上述流程并没有限制在选举过程中,一个Node只能投一票,那么什么场景下会投两票呢?比如NodeB投NodeA一票,但是NodeA迟迟不成为Master,NodeB等不及了发起了下一轮选主,这时候发现集群里多了个Node0,Node0优先级比NodeA还高,那NodeB肯定就改投Node0了。假设Node0和NodeA都处在等选票的环节,那显然这时候NodeB其实发挥了两票的作用,而且投给了不同的人。

那么这种问题应该怎么解决呢,比如raft算法中就引入了选举周期(term)的概念,保证了每个选举周期中每个成员只能投一票,如果需要再投就会进入下一个选举周期,term+1。假如最后出现两个节点都认为自己是master,那么肯定有一个term要大于另一个的term,而且因为两个term都收集到了多数派的选票,所以多数节点的term是较大的那个,保证了term小的master不可能commit任何状态变更(commit需要多数派节点先持久化日志成功,由于有term检测,不可能达到多数派持久化条件)。这就保证了集群的状态变更总是一致的。

而ES目前(6.2版本)并没有解决这个问题,构造类似场景的测试case可以看到会选出两个master,两个node都认为自己是master,向全集群发布状态变更,这个发布也是两阶段的,先保证多数派节点“接受”这次变更,然后再要求全部节点commit这次变更。很不幸,目前两个master可能都完成第一个阶段,进入commit阶段,导致节点间状态出现不一致,而在raft中这是不可能的。那么为什么都能完成第一个阶段呢,因为第一个阶段ES只是将新的cluster_state做简单的检查后放入内存队列,如果当前cluster_state的master为空,不会对新的clusterstate中的master做检查,即在接受了NodeA成为master的cluster_state后(还未commit),还可以继续接受NodeB成为master的cluster_state。这就使NodeA和NodeB都能达到commit条件,发起commit命令,从而将集群状态引向不一致。当然,这种脑裂很快会自动恢复,因为不一致发生后某个master再次发布cluster_state时就会发现无法达到多数派条件,或者是发现它的follower并不构成多数派而自动降级为candidate等。

这里要表达的是,ES的ZenDiscovery模块与成熟的一致性方案相比,在某些特殊场景下存在缺陷,下一篇文章讲ES的meta变更流程时也会分析其他的ES无法满足一致性的场景。

错误检测

MasterFaultDetection与NodesFaultDetection

这里的错误检测可以理解为类似心跳的机制,有两类错误检测,一类是Master定期检测集群内其他的Node,另一类是集群内其他的Node定期检测当前集群的Master。检查的方法就是定期执行ping请求。ES文档:

There are two fault detection processes running. The first is by the master, to ping all the other nodes in the cluster and verify that they are alive. And on the other end, each node pings to master to verify if its still alive or an election process needs to be initiated.
如果Master检测到某个Node连不上了,会执行removeNode的操作,将节点从cluste_state中移除,并发布新的cluster_state。当各个模块apply新的cluster_state时,就会执行一些恢复操作,比如选择新的primaryShard或者replica,执行数据复制等。

如果某个Node发现Master连不上了,会清空pending在内存中还未commit的new cluster_state,然后发起rejoin,重新加入集群(如果达到选举条件则触发新master选举)。

rejoin

除了上述两种情况,还有一种情况是Master发现自己已经不满足多数派条件(>=minimumMasterNodes)了,需要主动退出master状态(退出master状态并执行rejoin)以避免脑裂的发生,那么master如何发现自己需要rejoin呢?

上面提到,当有节点连不上时,会执行removeNode。在执行removeNode时判断剩余的Node是否满足多数派条件,如果不满足,则执行rejoin。

if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
            final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
            rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
                                                     masterNodes, electMasterService.minimumMasterNodes()));
            return resultBuilder.build(currentState);
        } else {
            return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
        }

在publish新的cluster_state时,分为send阶段和commit阶段,send阶段要求多数派必须成功,然后再进行commit。如果在send阶段没有实现多数派返回成功,那么可能是有了新的master或者是无法连接到多数派个节点等,则master需要执行rejoin。

try {
        publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
    } catch (FailedToCommitClusterStateException t) {
        // cluster service logs a WARN message
        logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",
            newState.version(), electMaster.minimumMasterNodes());

        synchronized (stateMutex) {
            pendingStatesQueue.failAllStatesAndClear(
                new ElasticsearchException("failed to publish cluster state"));

            rejoin("zen-disco-failed-to-publish");
        }
        throw t;
    }

在对其他节点进行定期的ping时,发现有其他节点也是master,此时会比较本节点与另一个master节点的cluster_state的version,谁的version大谁成为master,version小的执行rejoin。

if (otherClusterStateVersion > localClusterState.version()) {
        rejoin("zen-disco-discovered another master with a new cluster_state [" + otherMaster + "][" + reason + "]");
    } else {
        // TODO: do this outside mutex
        logger.warn("discovered [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster ([{}])", otherMaster, otherMaster, reason);
        try {
            // make sure we're connected to this node (connect to node does nothing if we're already connected)
            // since the network connections are asymmetric, it may be that we received a state but have disconnected from the node
            // in the past (after a master failure, for example)
            transportService.connectToNode(otherMaster);
            transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(localClusterState.nodes().getLocalNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

                @Override
                public void handleException(TransportException exp) {
                    logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), exp);
                }
            });
        } catch (Exception e) {
            logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), e);
        }
    }

集群扩缩容

上面讲了节点发现、Master选举、错误检测等机制,那么现在我们可以来看一下如何对集群进行扩缩容。

扩容DataNode

假设一个ES集群存储或者计算资源不够了,我们需要进行扩容,这里我们只针对DataNode,即配置为:

conf/elasticsearch.yml:
    node.master: false
    node.data: true

然后需要配置集群名、节点名等其他配置,为了让该节点能够加入集群,我们把discovery.zen.ping.unicast.hosts配置为集群中的master-eligible node。

conf/elasticsearch.yml:
    cluster.name: es-cluster
    node.name: node_Z
    discovery.zen.ping.unicast.hosts: ["x.x.x.x", "x.x.x.y", "x.x.x.z"]

然后启动节点,节点会自动加入到集群中,集群会自动进行rebalance,或者通过reroute api进行手动操作。

https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-reroute.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/shards-allocation.html

缩容DataNode

假设一个ES集群使用的机器数太多了,需要缩容,我们怎么安全的操作来保证数据安全,并且不影响可用性呢?

首先,我们选择需要缩容的节点,注意本节只针对DataNode的缩容,MasterNode缩容涉及到更复杂的问题,下面再讲。

然后,我们需要把这个Node上的Shards迁移到其他节点上,方法是先设置allocation规则,禁止分配Shard到要缩容的机器上,然后让集群进行rebalance。

PUT _cluster/settings
{
  "transient" : {
    "cluster.routing.allocation.exclude._ip" : "10.0.0.1"
  }
}

等这个节点上的数据全部迁移完成后,节点可以安全下线。

更详细的操作方式可以参考官方文档:

https://www.elastic.co/guide/en/elasticsearch/reference/current/allocation-filtering.html

扩容MasterNode

假如我们想扩容一个MasterNode(master-eligible node), 那么有个需要考虑的问题是,上面提到为了避免脑裂,ES是采用多数派的策略,需要配置一个quorum数:

conf/elasticsearch.yml:
    discovery.zen.minimum_master_nodes: 2

假设之前3个master-eligible node,我们可以配置quorum为2,如果扩容到4个master-eligible node,那么quorum就要提高到3。

所以我们应该先把discovery.zen.minimum_master_nodes这个配置改成3,再扩容master,更改这个配置可以通过API的方式:

curl -XPUT localhost:9200/_cluster/settings -d '{
    "persistent" : {
        "discovery.zen.minimum_master_nodes" : 3
    }
}

这个API发送给当前集群的master,然后新的值立即生效,然后master会把这个配置持久化到cluster meta中,之后所有节点都会以这个配置为准。

但是这种方式有个问题在于,配置文件中配置的值和cluster meta中的值很可能出现不一致,不一致很容易导致一些奇怪的问题,比如说集群重启后,在恢复cluster meta前就需要进行master选举,此时只可能拿配置中的值,拿不到cluster meta中的值,但是cluster meta恢复后,又需要以cluster meta中的值为准,这中间肯定存在一些正确性相关的边界case。

总之,动master节点以及相关的配置一定要谨慎,master配置错误很有可能导致脑裂甚至数据写坏、数据丢失等场景。

缩容MasterNode

缩容MasterNode与扩容跟扩容是相反的流程,我们需要先把节点缩下来,再把quorum数调下来,不再详细描述

与Zookeeper、raft等实现方式的比较

与使用Zookeeper相比

本篇讲了ES集群中节点相关的几大功能的实现方式:

  1. 节点发现
  2. Master选举
  3. 错误检测
  4. 集群扩缩容

试想下,如果我们使用Zookeeper来实现这几个功能,会带来哪些变化?

Zookeeper介绍

我们首先介绍一下Zookeeper,熟悉的同学可以略过。

Zookeeper分布式服务框架是Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。

简单来说,Zookeeper就是用于管理分布式系统中的节点、配置、状态,并完成各个节点间配置和状态的同步等。大量的分布式系统依赖Zookeeper或者是类似的组件。

Zookeeper通过目录树的形式来管理数据,每个节点称为一个znode,每个znode由3部分组成:

  • stat. 此为状态信息, 描述该znode的版本, 权限等信息.
  • data. 与该znode关联的数据.
  • children. 该znode下的子节点.

stat中有一项是ephemeralOwner,如果有值,代表是一个临时节点,临时节点会在session结束后删除,可以用来辅助应用进行master选举和错误检测。

Zookeeper提供watch功能,可以用于监听相应的事件,比如某个znode下的子节点的增减,某个znode本身的增减,某个znode的更新等。

怎么使用Zookeeper实现ES的上述功能
  1. 节点发现:每个节点的配置文件中配置一下Zookeeper服务器的地址,节点启动后到Zookeeper中某个目录中注册一个临时的znode。当前集群的master监听这个目录的子节点增减的事件,当发现有新节点时,将新节点加入集群。
  2. master选举:当一个master-eligible node启动时,都尝试到固定位置注册一个名为master的临时znode,如果注册成功,即成为master,如果注册失败则监听这个znode的变化。当master出现故障时,由于是临时znode,会自动删除,这时集群中其他的master-eligible node就会尝试再次注册。使用Zookeeper后其实是把选master变成了抢master。
  3. 错误检测:由于节点的znode和master的znode都是临时znode,如果节点故障,会与Zookeeper断开session,znode自动删除。集群的master只需要监听znode变更事件即可,如果master故障,其他的候选master则会监听到master znode被删除的事件,尝试成为新的master。
  4. 集群扩缩容:扩缩容将不再需要考虑minimum_master_nodes配置的问题,会变得更容易。
使用Zookeeper的优劣点

使用Zookeeper的好处是,把一些复杂的分布式一致性问题交给Zookeeper来做,ES本身的逻辑就可以简化很多,正确性也有保证,这也是大部分分布式系统实践过的路子。而ES的这套ZenDiscovery机制经历过很多次bug fix,到目前仍有一些边角的场景存在bug,而且运维也不简单。

那为什么ES不使用Zookeeper呢,大概是官方开发觉得增加Zookeeper依赖后会多依赖一个组件,使集群部署变得更复杂,用户在运维时需要多运维一个Zookeeper。

那么在自主实现这条路上,还有什么别的算法选择吗?当然有的,比如raft。

与使用raft相比

raft算法是近几年很火的一个分布式一致性算法,其实现相比paxos简单,在各种分布式系统中也得到了应用。这里不再描述其算法的细节,我们单从master选举算法角度,比较一下raft与ES目前选举算法的异同点:

相同点
  1. 多数派原则:必须得到超过半数的选票才能成为master。
  2. 选出的leader一定拥有最新已提交数据:在raft中,数据更新的节点不会给数据旧的节点投选票,而当选需要多数派的选票,则当选人一定有最新已提交数据。在es中,version大的节点排序优先级高,同样用于保证这一点。
不同点
  1. 正确性论证:raft是一个被论证过正确性的算法,而ES的算法是一个没有经过论证的算法,只能在实践中发现问题,做bug fix,这是我认为最大的不同。
  2. 是否有选举周期term:raft引入了选举周期的概念,每轮选举term加1,保证了在同一个term下每个参与人只能投1票。ES在选举时没有term的概念,不能保证每轮每个节点只投一票。
  3. 选举的倾向性:raft中只要一个节点拥有最新的已提交的数据,则有机会选举成为master。在ES中,version相同时会按照NodeId排序,总是NodeId小的人优先级高。
    看法
    raft从正确性上看肯定是更好的选择,而ES的选举算法经过几次bug fix也越来越像raft。当然,在ES最早开发时还没有raft,而未来ES如果继续沿着这个方向走很可能最终就变成一个raft实现。

Elasticsearch分布式一致性原理剖析(一)-节点篇

Elasticsearch内核解析 - 写入篇

ElasticSearch权威指南

RabbitMQ基本概念

生产者和消费者

Producer:生产者,投递消息的一方。

Consumer: 消费者,就是接收消息的一方。

Broker: 消息中间件的服务节点 。

队列

Queue: 队列,是RabbitMQ 的内部对象,用于存储消息。

RabbitMQ中消息都只能存储在队列中。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊 CRound-Robin ,即轮询)
给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

RabbitMQ 不支持队列层面的广播消费。

交换器、路由键与绑定

Exchange: 交换器。生产者将消息发送到 Exchange (交换器,通常也
可以用大写的 “X” 来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。

RoutingKey: 路由键 。生产者将消息发给交换器 的时候,一般会指定一个RoutingKey ,用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键 (BindingKey) 联
合使用才能最终生效。

Binding: 绑定。RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键 ( BindingKey ) ,这样 RabbitMQ 就知道如何正确地将消息路由到队列了。

在交换器类型和绑定键 (BindingKey) 固定的情况下,生产者可以在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里。

交换器类型

RabbitMQ 常用的交换器类型有 fanout 、 direct 、 topic 、 headers 这四种 。

fanout

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

direct

direct 类型的交换器路由规则也很简单,它会把消息路由到那些 BindingKey 和 RoutingKey
完全匹配的队列中。

topic

topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

  • RoutingKey 为一个点号”.”分隔的字符串(被点号”.”分隔开的每一段独立的字符串称为一个单词),如com.rabbitmq.client
  • BindingKey 和 RoutingKey 一样也是点号”.”分隔的字符串;
  • BindingKey 中可以存在两种特殊字符串”“和”#”,用于做模糊匹配,其中”“用于匹配一个单词,”#”用于匹配多规格单词(可以是零个)。

headers

headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中
的 headers 属性进行匹配。在绑定队列和交换器时制定一组键值对 , 当发送消息到交换器时,
RabbitMQ 会获取到该消息的 headers (也是一个键值对的形式) ,对比其中的键值对是否完全
匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由
到该队列 。 headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

RabbitMQ 运转流程

生产者发送消息:

  1. 生产者连接到 RabbitMQ Broker,建立一个连接(Connection),开启一个信道 (Channel)
  2. 生产者声明一个交换器 ,并设置相关属性,比如 交换机类型、是否持久化等。
  3. 生产者声明 一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等。
  4. 生产者通过路由键将交换器和队列绑定起来。
  5. 生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息。
  6. 相应的交换器根据接收到的路由键查找相匹配的队列 。
  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中。
  8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者。
  9. 关闭信道。
  10. 关闭连接。

消费者接收消息的过程:

  1. 消费者连接到 RabbitMQ Broker,建立一个连接 (Connection ) ,开启 一个信道 (Channel) 。
  2. 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
    以及做一些准备工作(详细内容请参考 3 .4节〉。
  3. 等待 RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
  4. 消费者确认 ( ack) 接收到的消息 。
  5. RabbitMQ 从队列中删除相应己经被确认的消息 。
  6. 关闭信道。
  7. 关闭连接。

RabbitMQ进阶

消息特殊情况

mandatory 和 immediate 是 channel . basicPublish 方法中的两个参数,它们都有
当消息传递过程中不可达目的地时将消息返回给生产者的功能。 RabbitMQ 提供的备份交换器
(Altemate Exchange) 可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定〉存
储起来,而不用返回给客户端。

mandatory参数

当 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件
的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者 。当 mandatory 参
数设置为 false 时,出现上述情形,则消息直接被丢弃 。

那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用
channel.addReturnListener 来添加 ReturnListener 监昕器实现。

immediate参数

当 imrnediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在
任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时 ,
该消息会通过 Basic.Return 返回至生产者。

RabbitMQ3.0版本开始去掉了对 imrnediate 参数的支持,对此 RabbitMQ 官方解释是:imrnediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用TTL和DLX 的方法替代。

备份交换器

备份交换器,英文名称为 Altemate Exchange,简称庙,或者更直白地称之为”备胎交换器”。
生产者在发送消息的时候如果不设置 mandatory 参数 , 那么消息在未被路由的情况下将会丢失 :
如果设置了 mandatory 参数,那么需要添加 ReturnListener 的编程逻辑,生产者的代码将
变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,
这样可以将未被路由的消息存储在 RabbitMQ 中,再在需要的时候去处理这些消息。

可以通过在声明交换器(调用channel.exchangeDeclare方法)的时候添加alternate-exchange 参数来实现,也可以通过策略的方式实现。如果两者同时使用,则前者的优先级更高,会覆盖掉 Policy 的设置 。

TTL

设置消息的 TTL

目前有两种方法可以设置消息的 TTL。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的 TTL 可以不同。如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置 的 TTL 值时,就会变成”死信” (Dead Message) ,消费者将无法再收到该消息。

  • 通过队列属性设置消息 TTL 的方法是在 channel.queueDeclare 方法中加入
    x-message -ttl 参数实现的,这个参数的单位是毫秒。

      Map<String, Object> argss = new HashMap<String , Object>();
      argss.put("x-message-ttl " , 6000);
      channel.queueDeclare(queueName , durable , exclusive , autoDelete , argss) ;
      //同时也可以通过 Policy 的方式来设置 TTL.示例如下 :
      rabbitmqctl set_policy TTL "食" '{"message-ttl":60000}' --apply-to queues
      //还可以通过调用 HTTPAPI 接口设置 :
      $ curl -i -u root:root -H "content-type:application/json"-X PUT
      -d'{"auto_delete":false, "durable":true, "arguments":{"x-message-ttl": 60000}}'
      http://localhost:15672/api/queues/{vhost}/{queuename}
    
  • 针对每条消息设置 TTL 的方法是在 channel.basicPublish 方法中加入 expiration
    的属性参数,单位为毫秒。

      AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties . Builder();
      builder.deliveryMode(2); / / 持久化消息
      builder.expiration( " 60000 " );/ / 设置 TTL=60000ms
      AMQP.BasicProperties properties = builder.build() ;
      channel.basicPublish(exchangeName, routingKey, mandatory, properties ,
      "ttlTestMessage".getBytes());
    

如果不设置 TTL.则表示此消息不会过期 ;如果将 TTL 设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃,这个特性可以部分替代 RabbitMQ 3.0 版本之前的 immediate 参数,之所以部分代替,是因为immediate 参数在投递失败时会用Basic.Return 将消息返回。

对于第一种设置队列 TTL 属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方
法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费
者之前判定的。

为什么这两种方法处理的方式不一样?因为第一种方法里,队列中己过期的消息肯定在队
列头部, RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。而第二种方法里,每条消
息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将
被消费时再判定是否过期 , 如果过期再进行删除即可。

设置队列的TTL

通过 channel.queueDeclare 方法中的 x-expires 参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过Basic.Get命令。

abb itMQ 会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时 。在
RabbitMQ 重启后 , 持久化的队列的过期时间会被重新计算。
用于表示过期时间的 x-expires 参数以毫秒为单位 , 井且服从和 x-message-ttl 一样
的约束条件,不过不能设置为 0。比如该参数设置为 1 000 ,则表示该队列如果在 1 秒钟之内未
使用则会被删除。

死信队列

DLX ,全称为 Dead-Letter-Exchange ,可以称之为死信交换器,也有人称之为死信邮箱。当
消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个
交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去 ,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的 TTL 设置为 0 配合使用可以弥补imrnediate参数功能。

延迟队列

延迟队列存储的对象是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费 。

在图 4-4 中,不仅展示的是死信队列的用法,也是延迟队列的用法,对于 queue.dlx 这个死信队列来说,同样可以看作延迟队列。假设一个应用中需要将每条消息都设置为 10 秒的延迟,生产者通过exchange.normal 这个交换器将发送的消息存储在 queue.normal 这个队列中。消费者订阅的并非是 queue.normal 这个队列,而是 queue.dlx 这个队列 。当消息从 queue.normal 这个队列中过期之后被存入 queue.dlx 这个队列中,消费者就恰巧消费到了延迟 10 秒的这条消息 。

优先队列

优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先
被消费的特权。可以通过设置队列的 x-max-priority 参数来实现。

RPC实现

RPC 的主要功用是让构建分布式计算更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。

一般在 RabbitMQ 中进行 RPC 是很简单。客户端发送请求消息,服务端回复响应的消息 。为了接收响应的消息,我们需要在请求消息中发送一个回调队列。

这里就用到两个属性。

  • replyTo: 通常用来设置一个回调队列。
  • correlationId : 用来关联请求(request) 和其调用RPC之后的回复(response ) 。

RPC 的处理流程如下 :

  1. 当客户端启动时,创建一个匿名的回调队列(名称由 RabbitMQ 自动创建,图 4-7 中
    的回调队列为 amq.gen-LhQzlgv3GhDOv8PIDabOXA 。
  2. 客户端为 RPC 请求设置 2 个属性 : reply T o 用来告知 RPC 服务端回复请求时的目的
    队列,即回调队列; correlationld 用来标记一个请求。
  3. 请求被发送到 rpc_queue 队列中。
  4. RPC 服务端监听 rpc_queue 队列中的请求,当请求到来时 , 服务端会处理并且把带有
    结果的消息发送给客户端。 接收的队列就是 replyTo 设定 的 回调队列。
  5. 客户端监昕回调队列,当有消息时 , 检查 correlationld 属性,如果与请求匹配,
    那就是结果了。

持久化

RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化 。

交换器的持久化是通过在声明队列是将 durable 参数置为 true 实现的。

队列的持久化是通过在声明队列时将 durable 参数置为 true 实现的。

队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的
消息不会丢失。要确保消息不会丢失 , 需要将其设置为持久化。通过将消息的投递模式
(BasicProperties 中的deliveryMode 属性)设置为 2 即可实现消息的持久化。

生产者确认

消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达
服务器 ,何谈持久化?

RabbitMQ 针对这个问题,提供了两种解决方式:

  • 通过事务机制实现
  • 通过发送方确认publisher confirm 机制实现。

事务机制

Rabb itMQ 客户端中与事务机制相关的方法有 三 个: channel.txSelect 、channel .txCommit 和 channel.txRollbacko。
channel.txSelect 用于将当前的信道设置成事务模式。
channel.txCommit 用于提交事务。
channel.txRollback 用于事务回滚。

在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback 方法来实现事务回夜。注意这里的 RabbitMQ 中的事务机制与大多数数据库中的事务概念井不相同,需要注意区分。

务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被
RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚 ,与此同时可以进
行消息重发。但是使用事务机制会”吸干” RabbitMQ 的性能。

发送方确认机制

生产者将信道设置成 confirmn (确认)模式,一旦信道进入 confmn 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认 (Basic.Ack) 给生产者(包含消息的唯一 ID) ,这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。 RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel.basicAck 方法中的multiple 参数,表示到这个序号之前的所有消息都己经得到了处理。

发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack) 命令,生产者应用程序同样可以在回调方法中处理该 nack 命令。

RabbitMQ实战指南

环境

系统: centos7

Redis: redis-3.2.4

rbenv: 2.6.3

安装目录: /root/temp/

部署流程

  1. 下载redis并上传到服务器并解压到/root/temp/redis-3.2.4

  2. 编译redis,在/root/temp/redis-3.2.4 目录下输入make

  3. 安装ruby

    1. 安装rbenv

       #install build dependencies
       sudo yum install -y git-core zlib zlib-devel gcc-c ++ patch readline readline-devel libyaml-devel libffi-devel openssl-devel make bzip2 autoconf automake libtool bison curl sqlite-devel
       
       #clim并安装rbenv环境
       cd~
       git clone git://github.com/sstephenson/rbenv.git .rbenv
       git clone git://github.com/sstephenson/ruby-build.git~ / .rbenv / plugins / ruby​​-build
       echo'export PATH =“$ HOME / .rbenv / bin:$ HOME / .rbenv / plugins / ruby​​-build / bin:$ PATH”'>>〜/ .bash_profile
       echo'eval“$(rbenv init  - )”'>>〜/ .bash_profile
       
       #re-init bash
       source~ / .bash_profile
       
       #install最新的ruby
       rbenv install -v 2.6.3
       
       #设置shell将使用的默认ruby版本
       rbenv global 2.6.3
       
       #禁用生成文档,因为这需要花费很多时间
       echo“gem: -  no-document”>〜/ .gemrc
       
       #install bundler
       gem install bundler
       
       每次安装gem时都必须执行#,以便运行ruby可执行文件
       rbenv rehash
      
    2. 在使用上面的安装ruby如果出现无法下载文件,此时可以通过下载好ruby,我选择的是最新ruby-2.6.3,上传到服务器/root/temp/目录下,进入/root/.rbenv/plugins/ruby-build/share/ruby-build目录下,找到对应的文件也就是2.6.3,打开这个文件,修改

       install_package "openssl-1.0.2j" "https://www.openssl.org/source/openssl-1.0.2j.tar.gz#e7aff292be21c259c6af26469c7a9b3ba26e9abaaffd325e3dccc9785256c431" mac_openssl --if has_broken_mac_openssl
       install_package "ruby-2.6.3" "file:///root/temp/ruby-2.6.3.tar.gz" 
      
    3. 如果使用rvm来安装ruby,在本机上会出现CA证书过期无法下载文件。

  4. 配置redis集群

    1. /root/temp/redis-3.2.4目录下新建目录redis-cluster

    2. 在目录redis-cluster下创建7001 7002 7003 7004 7005 7006 目录

    3. /root/temp/redis-3.2.4目录下的redis.conf配置文件复制到7001 … 7006 目录下

    4. 分配修改7001-7006目录下的redis.conf 文件修改为对应的端口号

       port 7001 #要根据所在的子目录下配置
       daemonize yes
       pidfile /var/run/redis_7001.pid  #要根据所在的子目录下配置
       logfile "/var/log/redis-7001.log" #要根据所在的子目录下配置
       appendonly yes
       cluster-enabled yes
       cluster-config-file nodes-7001.conf #要根据所在的子目录下配置
       cluster-node-timeout 15000
      
    5. 启动redis - 在对应目录下输入命令,逐个启动

       cd  7001
       /root/temp/redis-3.2.4/src/redis-server redis.conf
       cd ..
       cd  7002
       /root/temp/redis-3.2.4/src/redis-server redis.conf
       cd ..
       cd  7003
       cd ..
       /root/temp/redis-3.2.4/src/redis-server redis.conf
       cd  7004
       cd ..
       /root/temp/redis-3.2.4/src/redis-server redis.conf
       cd ..
          cd  7005
       /root/temp/redis-3.2.4/src/redis-server redis.conf
       cd  7006
       /root/temp/redis-3.2.4/src/redis-server redis.conf
      
    6. 启动redis集群 — 在/root/temp/redis-3.2.4 目录下输入

       src/redis-trib create --replicas 1 127.0.0.1:7001  127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 127.0.0.1:7006
      

rbenv安装ruby2.3.0在线安装不上。老子出绝招了(更新)
https://gist.github.com/soardex/e95cdc230d1ac5b824b3


秒杀系统简介

秒杀场景一般会在电商网站举行一些活动或者节假日在12306网站上抢票时遇到。对于电商网站中一些稀缺或者特价商品,电商网站一般会在约定时间点对其进行限量销售,因为这些商品的特殊性,会吸引大量用户前来抢购,并且会在约定的时间点同时在秒杀页面进行抢购。

秒杀系统主要面临三大问题:    

一、瞬时的高并发访问。抢购和普通的电商销售有所不同,普通的电商销售,流量是比较平均的,虽然有波峰波谷,但不会特别突出。而抢购是在特定时间点进行的推销活动,抢购开始前,用户不断刷新页面,以获得购买按钮;抢购开始的一瞬间,集中并发购买。   

二、数据正确性。抢购毕竟是一种购买行为,需要购买、扣减库存、支付等复杂的流程,在此过程中,要保证数据的正确性,防止超卖(卖出量超过库存)的发生。   

三、防作弊。

秒杀系统的设计理念

秒杀系统的优化或者设计理念为以下几点:

限流: 鉴于只有少部分用户能够秒杀成功,所以要限制大部分流量,只允许少部分流量进入服务后端。

削峰:对于秒杀系统瞬时会有大量用户涌入,所以在抢购一开始会有很高的瞬间峰值。高峰值流量是压垮系统很重要的原因,所以如何把瞬间的高流量变成一段时间平稳的流量也是设计秒杀系统很重要的思路。实现削峰的常用的方法有利用缓存和消息中间件等技术。

异步处理:秒杀系统是一个高并发系统,采用异步处理模式可以极大地提高系统并发量,其实异步处理就是削峰的一种实现方式。

内存缓存:秒杀系统最大的瓶颈一般都是数据库读写,由于数据库读写属于磁盘IO,性能很低,如果能够把部分数据或业务逻辑转移到内存缓存,效率会有极大地提升。

可拓展:当然如果我们想支持更多用户,更大的并发,最好就将系统设计成弹性可拓展的,如果流量来了,拓展机器就好了。像淘宝、京东等双十一活动时会增加大量机器应对交易高峰。

前端方案

页面静态化: 将页面上的所有静态元素全部静态化,将其交给ngix管理,以此同时采用cdn来N来抗峰值;对于动态部分采用Ajex请求动态加载数据

静态资源优化:主要是讲多个css/js请求合并为一个等。

秒杀接口隐藏:在秒杀开始前一段时间才暴露出秒杀的接口(路径),同时在后端可以通过与用户id绑定生成每个用户的秒杀路径,保存在redis中,在固定时间中只允许固定的次数请求,过多的请求被拦截。这样防止使用脚本等进行大批量的请求。

验证码:主要是为了流量削峰与筛除简单脚本。通过用户输入验证码的时间,将在某一时间的突发流量均摊到随后的一段时间,还能筛除掉一部分简单的无法识别验证码的脚本。

后端方案

用户限流:通过对用户id绑定生成的秒杀路径,限定访问次数

redis缓存: 将数据库内容比如秒杀商品内容详情缓存到redis,不访问数据库

消息队列: 通过将请求缓存到消息队列,异步执行来削减同一时间到来的请求。

CDN:内容分发网络。其基本思路是尽可能避开互联网上有可能影响数据传输速度和稳定性的瓶颈和环节,使内容传输的更快、更稳定。简单的来说,就是把原服务器上数据复制到其他服务器上,用户访问时,那台服务器近访问到的就是那台服务器上的数据。CDN的劣势是内容的变更生效慢,所以仅适用于“几乎不变”的资源,例如引用的js包,图片等。

秒杀系统的具体实现

秒杀开始前:

只能看到看到秒杀商品详情,无法进入秒杀接口,直到秒杀开始前一段时间。

对进入秒杀商品详情页的用户将其uid与秒杀商品gid结合,生成该用户对该秒杀商品的秒杀接口路径,将其保存到redis中。

在秒杀商品详情页页面采用js脚本来实现倒计时与限制秒杀按钮的电机。

秒杀开始时:

服务端使用redis提前缓存秒杀商品详情,主要参数为库存数量。

使用redis来实现秒杀商品的预减,不直接访问数据库。当redis中的商品库存少于0,拒绝请求。这样能保证只有少量的请求能够接近数据库。

将抢到预减的请求放入到消息队列中,将同步转为异步执行,实现流量的再次削峰。

服务层从消息队列中拿出请求,进行数据库操作,实现真正的商品抢购事务,生成订单;若事务失败,回滚事务,告知用户抢购失败。若事务成功,请求用户确认订单详情,并支付。

注意事项:

防止买超: 需要对数据库字段进行设计,添加索引;在服务端需要对库存字段进行限制,保证不为负。

消息队列: 也可以使用redis来充当消息队列。

用户限流: 限制用户一段时间内能点击的次数

IP限流: 方法太过于粗暴,容易误封无辜用户,不推荐。

0%