Проблемы с записью Dataframe в MySQL

311 просмотра
0
0 Комментариев

необходимо записать данные из dataframe в таблицу в бд MySQL. Пытаюсь сделать это вот так:

from pathlib import Path
import pandas as pd
import numpy as np
import pymysql
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.externals import joblib
import mysql.connector
from sqlalchemy import create_engine
 
def fit_log_regression(X, y, **grid_kwargs):
    # pipe line: vectorize tweets (one hot encoding), LogisticRegression
    pipeline = Pipeline([
        ("vect", CountVectorizer()),
        ("LogRegr", LogisticRegression())])
 
    param_grid = dict(vect__min_df=[1],  # [2, 3, 5, 10]
                      #vect__ngram_range=[(1,1),(1,2),(1,3),(1,4),(1,5),(2,2),(2,3),(2,4),(2,5)],
                      vect__ngram_range=[(1,2)],
                      vect__analyzer=[ 'char_wb'],#, 'char_wb'],
                      LogRegr__C=[5],  # [0.1, 1, 10, 100],
                      LogRegr__max_iter=[100])
 
    # optimize hyperparameters, using [param_grid]
    grid_search = GridSearchCV(pipeline, param_grid=param_grid, **grid_kwargs)
    grid_search.fit(X, y)
    return grid_search
 
def fit_multinomial_nb(X, y, **grid_kwargs):
    # pipe line: vectorize tweets (one hot encoding), MultinomialNB
    pipeline = Pipeline([
        ("vect", CountVectorizer()),
        ("MultinomNB", MultinomialNB())])
 
    param_grid = dict(vect__min_df=[7],
                      vect__ngram_range=[(2,5)],
                      vect__analyzer=['char_wb'],
                      MultinomNB__alpha=[0.01])
    # optimize hyperparameters, using [param_grid]
    grid_search = GridSearchCV(pipeline, param_grid=param_grid, **grid_kwargs)
    grid_search.fit(X, y)
    return grid_search
 
def print_grid_results(grid_search):
    print('Best score {}'.format(grid_search.best_score_))
    print('-' * 70)
    print('Best estimator')
    print(grid_search.best_estimator_)
    print('*' * 70)
    print('Best parameters:')
    print('*' * 70)
    print(grid_search.best_params_)
    print('-' * 70)
 
def delete_tabs(str):
    str = str.lstrip()
    str = str.rstrip()
    return str
 
def main(path):
    # read data set into DF. Only the following columns: ['id','tdate','ttext','ttype']
    df = pd.read_csv('D:\pos_neg_2.csv', sep=';', header=None,
                     names=['id','tdate','ttext','ttype'],
                     usecols=[0,1,3,4])
    # Speed up: randomly select 5% of data
    # comment it out to achieve the best prediction performance (VERY SLOW!)
    df = df.sample(frac=0.3)
    grid_lr = fit_log_regression(df['ttext'], df['ttype'], cv=3, verbose=1, n_jobs=-1)
    grid_nb = fit_multinomial_nb(df['ttext'], df['ttype'], cv=3, verbose=1, n_jobs=-1)
 
    print_grid_results(grid_lr)
    print_grid_results(grid_nb)
 
    # persist trained models
    joblib.dump(grid_lr, 'grid_search_lr.pkl')
    joblib.dump(grid_nb, 'grid_search_nb.pkl')
 
    features = np.array(grid_lr.best_estimator_.named_steps['vect'].get_feature_names())
    coefs = pd.Series(grid_lr.best_estimator_.named_steps['LogRegr'].coef_.ravel(), features)
    print('top 20 positive features:')
    print(coefs.nlargest(20))
    print('-' * 70)
    print('top 20 negative features:')
    print(coefs.nsmallest(20))
    print('-' * 70)
 
 
  #  test = pd.DataFrame({
  #      'ttext':['Прекрасные актеры, но фильм отвратительный',
  #               'Ну сходил я на этот фильм. Отзывы были нормальные, а оказалось - отстой!',
  #               'StackOverflow рулит','отличный фильм!'
   #             ]
   # })
 
    db = pymysql.connect(host='localhost', user='root', passwd='',
                    database='mom_db', charset='utf8')
 
    test = pd.read_sql("SELECT comm FROM comments ", db)
 
    test['comm'] = test['comm'].apply(delete_tabs)
    #test['expected'] = [-1, -1, 1, 1]
    test['pred_lr'] = grid_lr.best_estimator_.predict(test['comm'])
    test['pred_nb'] = grid_nb.best_estimator_.predict(test['comm'])
    pd.options.display.expand_frame_repr = False
    print(test)
    engine = create_engine('mysql+mysqlconnector://[root]:[]@[localhost]:[3306]/[mom_db]', echo=False)
    cnx = engine.raw_connection()
    test.to_sql(name='est_comm', con=cnx, if_exists = 'append', index=False)
    #features = np.array(grid_search.best_estimator_.named_steps['CountVectorizer'].get_feature_names()[:5])
 
if __name__ == "__main__":
    main(r'pos_neg_2.csv')

вплоть до записи в таблицу все работает отлично, а вот на записи выдает такую ошибку:

gaierror                                  Traceback (most recent call last)
D:\anaconda\lib\site-packages\mysql\connector\network.py in open_connection(self)
    447                                            0, socket.SOCK_STREAM,
--> 448                                            socket.SOL_TCP)
    449             # If multiple results we favor IPv4, unless IPv6 was forced.
 
D:\anaconda\lib\socket.py in getaddrinfo(host, port, family, type, proto, flags)
    744     addrlist = []
--> 745     for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
    746         af, socktype, proto, canonname, sa = res
 
gaierror: [Errno 11001] getaddrinfo failed
 
During handling of the above exception, another exception occurred:
 
InterfaceError                            Traceback (most recent call last)
D:\anaconda\lib\site-packages\sqlalchemy\engine\base.py in _wrap_pool_connect(self, fn, connection)
   2157         try:
-> 2158             return fn()
   2159         except dialect.dbapi.Error as e:
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in unique_connection(self)
    344         """
--> 345         return _ConnectionFairy._checkout(self)
    346
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in _checkout(cls, pool, threadconns, fairy)
    781         if not fairy:
--> 782             fairy = _ConnectionRecord.checkout(pool)
    783
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in checkout(cls, pool)
    531     def checkout(cls, pool):
--> 532         rec = pool._do_get()
    533         try:
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in _do_get(self)
   1185                 with util.safe_reraise():
-> 1186                     self._dec_overflow()
   1187         else:
 
D:\anaconda\lib\site-packages\sqlalchemy\util\langhelpers.py in __exit__(self, type_, value, traceback)
     65             if not self.warn_only:
---> 66                 compat.reraise(exc_type, exc_value, exc_tb)
     67         else:
 
D:\anaconda\lib\site-packages\sqlalchemy\util\compat.py in reraise(tp, value, tb, cause)
    186             raise value.with_traceback(tb)
--> 187         raise value
    188
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in _do_get(self)
   1182             try:
-> 1183                 return self._create_connection()
   1184             except:
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in _create_connection(self)
    349
--> 350         return _ConnectionRecord(self)
    351
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in __init__(self, pool, connect)
    476         if connect:
--> 477             self.__connect(first_connect_check=True)
    478         self.finalize_callback = deque()
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in __connect(self, first_connect_check)
    666             self.starttime = time.time()
--> 667             connection = pool._invoke_creator(self)
    668             pool.logger.debug("Created new connection %r", connection)
 
D:\anaconda\lib\site-packages\sqlalchemy\engine\strategies.py in connect(connection_record)
    104                             return connection
--> 105                 return dialect.connect(*cargs, **cparams)
    106
 
D:\anaconda\lib\site-packages\sqlalchemy\engine\default.py in connect(self, *cargs, **cparams)
    409     def connect(self, *cargs, **cparams):
--> 410         return self.dbapi.connect(*cargs, **cparams)
    411
 
D:\anaconda\lib\site-packages\mysql\connector\__init__.py in connect(*args, **kwargs)
    161     # Regular connection
--> 162     return MySQLConnection(*args, **kwargs)
    163 Connect = connect  # pylint: disable=C0103
 
D:\anaconda\lib\site-packages\mysql\connector\connection.py in __init__(self, *args, **kwargs)
    128         if len(kwargs) > 0:
--> 129             self.connect(**kwargs)
    130
 
D:\anaconda\lib\site-packages\mysql\connector\connection.py in connect(self, **kwargs)
    453         self.disconnect()
--> 454         self._open_connection()
    455         self._post_connection()
 
D:\anaconda\lib\site-packages\mysql\connector\connection.py in _open_connection(self)
    416         self._socket = self._get_connection()
--> 417         self._socket.open_connection()
    418         self._do_handshake()
 
D:\anaconda\lib\site-packages\mysql\connector\network.py in open_connection(self)
    463             raise errors.InterfaceError(
--> 464                 errno=2003, values=(self.get_address(), _strioerror(err)))
    465         else:
 
InterfaceError: 2003: Can't connect to MySQL server on 'localhost]:[3306:3306' (11001 getaddrinfo failed)
 
The above exception was the direct cause of the following exception:
 
InterfaceError                            Traceback (most recent call last)
<ipython-input-12-6c628b2d709e> in <module>()
    113
    114 if __name__ == "__main__":
--> 115     main(r'pos_neg_2.csv')
 
<ipython-input-12-6c628b2d709e> in main(path)
    108     print(test)
    109     engine = create_engine('mysql+mysqlconnector://[root]:[]@[localhost]:[3306]/[mom_db]', echo=False)
--> 110     cnx = engine.raw_connection()
    111     test.to_sql(name='est_comm', con=cnx, if_exists = 'append', index=False)
    112     #features = np.array(grid_search.best_estimator_.named_steps['CountVectorizer'].get_feature_names()[:5])
 
D:\anaconda\lib\site-packages\sqlalchemy\engine\base.py in raw_connection(self, _connection)
   2186         """
   2187         return self._wrap_pool_connect(
-> 2188             self.pool.unique_connection, _connection)
   2189
   2190
 
D:\anaconda\lib\site-packages\sqlalchemy\engine\base.py in _wrap_pool_connect(self, fn, connection)
   2160             if connection is None:
   2161                 Connection._handle_dbapi_exception_noconnection(
-> 2162                     e, dialect, self)
   2163             else:
   2164                 util.reraise(*sys.exc_info())
 
D:\anaconda\lib\site-packages\sqlalchemy\engine\base.py in _handle_dbapi_exception_noconnection(cls, e, dialect, engine)
   1474             util.raise_from_cause(
   1475                 sqlalchemy_exception,
-> 1476                 exc_info
   1477             )
   1478         else:
 
D:\anaconda\lib\site-packages\sqlalchemy\util\compat.py in raise_from_cause(exception, exc_info)
    201     exc_type, exc_value, exc_tb = exc_info
    202     cause = exc_value if exc_value is not exception else None
--> 203     reraise(type(exception), exception, tb=exc_tb, cause=cause)
    204
    205 if py3k:
 
D:\anaconda\lib\site-packages\sqlalchemy\util\compat.py in reraise(tp, value, tb, cause)
    184             value.__cause__ = cause
    185         if value.__traceback__ is not tb:
--> 186             raise value.with_traceback(tb)
    187         raise value
    188
 
D:\anaconda\lib\site-packages\sqlalchemy\engine\base.py in _wrap_pool_connect(self, fn, connection)
   2156         dialect = self.dialect
   2157         try:
-> 2158             return fn()
   2159         except dialect.dbapi.Error as e:
   2160             if connection is None:
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in unique_connection(self)
    343
    344         """
--> 345         return _ConnectionFairy._checkout(self)
    346
    347     def _create_connection(self):
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in _checkout(cls, pool, threadconns, fairy)
    780     def _checkout(cls, pool, threadconns=None, fairy=None):
    781         if not fairy:
--> 782             fairy = _ConnectionRecord.checkout(pool)
    783
    784             fairy._pool = pool
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in checkout(cls, pool)
    530     @classmethod
    531     def checkout(cls, pool):
--> 532         rec = pool._do_get()
    533         try:
    534             dbapi_connection = rec.get_connection()
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in _do_get(self)
   1184             except:
   1185                 with util.safe_reraise():
-> 1186                     self._dec_overflow()
   1187         else:
   1188             return self._do_get()
 
D:\anaconda\lib\site-packages\sqlalchemy\util\langhelpers.py in __exit__(self, type_, value, traceback)
     64             self._exc_info = None   # remove potential circular references
     65             if not self.warn_only:
---> 66                 compat.reraise(exc_type, exc_value, exc_tb)
     67         else:
     68             if not compat.py3k and self._exc_info and self._exc_info[1]:
 
D:\anaconda\lib\site-packages\sqlalchemy\util\compat.py in reraise(tp, value, tb, cause)
    185         if value.__traceback__ is not tb:
    186             raise value.with_traceback(tb)
--> 187         raise value
    188
    189 else:
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in _do_get(self)
   1181         if self._inc_overflow():
   1182             try:
-> 1183                 return self._create_connection()
   1184             except:
   1185                 with util.safe_reraise():
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in _create_connection(self)
    348         """Called by subclasses to create a new ConnectionRecord."""
    349
--> 350         return _ConnectionRecord(self)
    351
    352     def _invalidate(self, connection, exception=None, _checkin=True):
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in __init__(self, pool, connect)
    475         self.__pool = pool
    476         if connect:
--> 477             self.__connect(first_connect_check=True)
    478         self.finalize_callback = deque()
    479
 
D:\anaconda\lib\site-packages\sqlalchemy\pool.py in __connect(self, first_connect_check)
    665         try:
    666             self.starttime = time.time()
--> 667             connection = pool._invoke_creator(self)
    668             pool.logger.debug("Created new connection %r", connection)
    669             self.connection = connection
 
D:\anaconda\lib\site-packages\sqlalchemy\engine\strategies.py in connect(connection_record)
    103                         if connection is not None:
    104                             return connection
--> 105                 return dialect.connect(*cargs, **cparams)
    106
    107             creator = pop_kwarg('creator', connect)
 
D:\anaconda\lib\site-packages\sqlalchemy\engine\default.py in connect(self, *cargs, **cparams)
    408
    409     def connect(self, *cargs, **cparams):
--> 410         return self.dbapi.connect(*cargs, **cparams)
    411
    412     def create_connect_args(self, url):
 
D:\anaconda\lib\site-packages\mysql\connector\__init__.py in connect(*args, **kwargs)
    160
    161     # Regular connection
--> 162     return MySQLConnection(*args, **kwargs)
    163 Connect = connect  # pylint: disable=C0103
    164
 
D:\anaconda\lib\site-packages\mysql\connector\connection.py in __init__(self, *args, **kwargs)
    127
    128         if len(kwargs) > 0:
--> 129             self.connect(**kwargs)
    130
    131     def _get_self(self):
 
D:\anaconda\lib\site-packages\mysql\connector\connection.py in connect(self, **kwargs)
    452
    453         self.disconnect()
--> 454         self._open_connection()
    455         self._post_connection()
    456
 
D:\anaconda\lib\site-packages\mysql\connector\connection.py in _open_connection(self)
    415         """
    416         self._socket = self._get_connection()
--> 417         self._socket.open_connection()
    418         self._do_handshake()
    419         self._do_auth(self._user, self._password,
 
D:\anaconda\lib\site-packages\mysql\connector\network.py in open_connection(self)
    462         except IOError as err:
    463             raise errors.InterfaceError(
--> 464                 errno=2003, values=(self.get_address(), _strioerror(err)))
    465         else:
    466             (self._family, socktype, proto, _, sockaddr) = addrinfo
 
InterfaceError: (mysql.connector.errors.InterfaceError) 2003: Can't connect to MySQL server on 'localhost]:[3306:3306' (11001 getaddrinfo failed) (Background on this error at: http://sqlalche.me/e/rvf5)

и вот все данные:
вот все параметры подключения к бд

что я делаю не так?
P.S. вместо localhost указывать 127.0.0.1 пробовал, результат идентичный


Добавить комментарий

2 Answers

Python Опубликовано 12.12.2018
0

В строке подключения не нужно брать в квадратные скобки. Попробуйте без них:

engine = create_engine('mysql+mysqlconnector://root@localhost:3306/mom_db', echo=False)

Добавить комментарий
0

Попробуйте так:

from sqlalchemy import create_engine
import pymysql
import pandas as pd
 
db_connection = 'mysql+pymysql://root:mysql_password@localhost/mom_db?charset=utf8mb4'
conn = create_engine(db_connection)
 
df = pd.read_sql("...", conn)
...
test.to_sql(name='est_comm', con=conn, if_exists='append', index=False)
...

PS один и тот же SQL Alchemy connection / engine объект conn можно и нужно использовать и для чтения и для записи из/в MySQL DB

Добавить комментарий
Напишите свой ответ на данный вопрос.
Scroll Up