在开发 Python 应用时,经常会使用到 Jupyter 来完成 Python 应用的开发及调试。简而言之,Jupyter Notebook 是以网页的形式打开,可以在网页页面中直接编写代码和运行代码,代码的运行结果也会直接在代码块下显示。如在编程过程中需要编写说明文档,可在同一个页面中直接编写,便于作及时的说明和解释。在今天的文章中,我将使用 Jupyter 来进行展示。
今天我就一个简单的例子来进行展示如何使用 Python 语言导入一个 CSV 文件 。这个 CSV 的文件很简单,但是我们通过这个文件来展示如何在 Python 中使用相应的 API 来导入数据。下载的 addresses.csv 文件很简单:
addresses.csv
-
id,firstname,surname,address,city,state,postcode
-
1,John,Doe,
120 jefferson st.,Riverside,NJ,
08075
-
2,Jack,McGinnis,
220 hobo Av.,Phila,PA,
09119
-
3,John Da Man,Repici,
120 Jefferson St.,Riverside,NJ,
08075
-
4,Stephen,Tyler,
7452 Terrace At the Plaz road,SomeTown,SD,
91234
-
5,Joan the bone,Anne,Jet
9th at Terrace plc,Desert City,CO,
00123
我们在自己的电脑上创建一个叫做 py-elasticsearch 的目录,并把 addresses.csv 文件拷贝到这个文件夹中。我们在这个目录中启动 jupyter:
-
$
pwd
-
/Users/liuxg/python/py-elasticsearch
-
$
ls
-
addresses.csv
安装
我们需要安装如下的部分:
Elastic Stack
你可以参照文章 “Elastic:菜鸟上手指南” 来根据自己的操作系统来安装自己的 Elasticsearch 及 Kibana。你需要启动 Elasticsearch 及 Kibana。
Jupyter
你需要按照 Jupyter 来创建 notebook。请根据自己的操作系统安装相应的软件。
Python
你可以安装最新的 Python 来进行实践。在我的电脑上,我安装的版本是 3.8.5。
$ jupyter notebook
这样就创建了我们的一个 jupyter notebook。我们创建一个叫做 py-elasticsearch 的 notebook:
我们可以在命令前面添加 !来运行 SHELL 指令。上面显示我们的 python 版本信息。
创建 Python 应用
我们接下来需要在自己的电脑上安装相应的模块:
-
pip3
install elasticsearch
-
pip3
panda
我们接下来输入如下的代码:
-
try:
-
import os
-
import sys
-
-
import elasticsearch
-
from elasticsearch
import Elasticsearch
-
import pandas as pd
-
-
print(
"All Modules Loaded ! ")
-
except
Exception
as e:
-
print(
"Some Modules are Missing {}".format(e))
你可以使用 SHIFT + ENTER 来执行代码。上面显示所有的模块都已经被装载了。如果你没有看到上面的消息,你需要安装相应的模块。
我们接下来创建一个函数来连接 Elasticsearch:
-
def connect_elasticsearch():
-
es =
None
-
es = Elasticsearch([{
'host':
'localhost',
'port':
9200}])
-
if es.ping():
-
print(
'Yupiee Connected ')
-
else:
-
print(
'Awww it could not connect!')
-
return es
-
es = connect_elasticsearch()
-
es.ping()
上面代码显示我们已经成功地连接到 Elasticsearch。接下来我们来创建一个叫做 liuxg-test 的 index:
es.indices.create(index="liuxg-test", ignore=400)
我们可以到 Kibana 中查看是否有一个叫做 liuxg-test 的 index 已经被创建:
GET _cat/indices/liuxg-test
我们接下来显示所有的索引:
-
res = es.
indices.get_alias(
"*")
-
for name
in res:
-
print(name)
接下来,我们来删除上面我们已经创建的 liuxg-test 索引:
es.indices.delete(index="liuxg-test", ignore=[400,404])
上面显示已经成功。我们可以去 Kibana,并再次查询 liuxg-test 索引:
GET _cat/indices/liuxg-test
显然这次,我们没有看到 liuxg-test 这个索引。它表明我们的索引已经被删除了。
我们接下来导入两个文档到 Elasticsearch 中去:
-
e1 = {
-
"first_name"
:"nitin",
-
"last_name"
:"panwar",
-
"age":
27,
-
"about":
"Love to play cricket",
-
"interests": [
'sports',
'music'],
-
}
-
-
e2 = {
-
"first_name" :
"Jane",
-
"last_name" :
"Smith",
-
"age" :
32,
-
"about" :
"I like to collect rock albums",
-
"interests": [
"music" ]
-
}
-
es.indices.create(
index=
'people', ignore=
400)
-
res1 = es.
index(
index=
'people', doc_type=
'_doc', body=e1, id=
1)
-
res2 = es.
index(
index=
'people', doc_type=
'_doc', body=e2, id=
2)
在上面,我们创建了一个叫做 people 的索引,并把两个文档 e1 及 e2 导入到 Elasticsearch 中:
我们可以到 Kibana 中查看所以 people 的文档:
GET people/_search
我们可以清楚地看到有两个文档被成功地导入到 Elasticsearch 中。
我们接下来删除一个文档:
res = es.delete(index='people', doc_type='_doc', id=1)
我们到 Kibana 中去查看 id 为1 的文档:
GET people/_doc/1
上面的命令显示该文档不存在。
接下来,我们来搜索所有的文档:
-
-
res = es.search(index =
'megacorp', body = {
'query': {
"match_all": {}} } )
在很多的时候,我们需要定义一个索引的 settings 及 mapping。我们可以按照如下的调用来完成:
-
settings = {
-
"settings": {
-
"number_of_shards":
1,
-
"number_of_replicas":
0
-
},
-
"mappings": {
-
"dynamic":
"true",
-
"_source": {
-
"enabled":
"true"
-
},
-
"properties": {
-
"title": {
-
"type":
"text"
-
},
-
"name": {
-
"type":
"text"
-
}
-
}
-
}
-
}
-
-
indexName =
'liuxg-with-mapping'
-
es.indices.
create(index=indexName, ignore=[
400,
404], body=settings)
我们可以在 Kibana 中进行查看:
GET liuxg-with-mapping
导入 CSV 文件
接下来,我们来进行我们的正题。我们重新创建一个叫做 csv-elasticsearch 的 Notebook:
我们打入如下的命令来装载所有的模块:
-
try:
-
import elasticsearch
-
from elasticsearch
import Elasticsearch
-
-
import pandas as pd
-
import json
-
from ast
import literal_eval
-
from tqdm
import tqdm
-
import datetime
-
import os
-
import sys
-
import numpy as np
-
-
from elasticsearch
import helpers
-
print(
"Loaded ............")
-
except
Exception
as
E:
-
print(
"Some Modules are Missing{}".format(e))
如果你看到 Loaded,则表明所有的模块都装载正确。否则,你需要安装相应的模块。接下来,我们确保我们之前的 addresses.csv 位于我们的 Jupyter 启动的目录里:
-
for
name
in os.listdir():
-
print(
name)
我们接下来尝试阅读这个 csv 文件。
-
df = pd.read_csv("addresses.csv")
-
df.head(
2)
显然 Pandas 很方便地让我们读入我们的数据。上面显示我们有5个文档,有7列。在下面的代码中,我们将把文档中的 id 当做 Elasticsearch 文档中的 id。这个 id 是唯一的。我们来创建连接到 Elasticsearch 的实例:
-
ENDPOINT =
"http://localhost:9200"
-
es = Elasticsearch(timeout=600, hosts = ENDPOINT)
-
es.ping()
在上面,我们对数据做了简单的清洗。对于确实任何一个项的文档,我们直接去掉。接下来,我们把 df 中的数据转换为 Elasticsearch 可以理解的格式:
df2 = df.to_dict('records')
我们需要创建一个 generator 来把数据写入到 Elasticsearch:
-
def generator(df2):
-
for c, line
in enumerate(df2):
-
try:
-
yield {
-
'_index':
"addresses",
-
'_type':
'_doc',
-
'_id': line.get(
"id", None),
-
'_source': {
-
"firstname"
:line.get(
"firstname",
""),
-
"surname"
:line.get(
"surname",
""),
-
"address"
:line.get(
"address",
""),
-
"city"
:line.get(
"city",
""),
-
"state"
:line.get(
"state",
""),
-
"postcode"
:line.get(
"postcode",
"")
-
}
-
}
-
except
StopIteration:
-
return
接下来,我们使用 helper 来导入数据:
-
try:
-
res = helper.bulk(es,
generator(df2))
-
print(
"Working")
-
except
Exception
as e:
-
pass
我们到 Kibana 中查看 addresses 索引:
GET _cat/indices/addresses?v
-
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
-
yellow open addresses eQCjFfWQTIyMR
2D
4eCyv-g
1
1
5
5
17.
4kb
17.
4kb
上面显示有四个文档。我们最早的 CSV 文档中有5个文档,这是因为我们在进行 helper.bulk 之前,已经调用过 next(my) 一次。
我们可以使用如下的命令来进行查询:
更多关于如何使用 Python 导入数据到 Elasticsearch 的介绍,可以参考文章 “Elasticsearch:Elasticsearch 开发入门 - Python”。
转载:https://blog.csdn.net/UbuntuTouch/article/details/117067988