Blog

Un exemple d'InfluxDB : gestion des données de séries temporelles en temps réel

Chargement...

10 min de lecture

Un exemple d'InfluxDB : gestion des données de séries temporelles en temps réel

Après la sortie de notre add-on InfluxDB, vous vous demandez peut-être si une base de données de séries temporelles peut répondre à vos besoins. Dans cet article de blog, nous allons expliquer un exemple d'utilisation typique d'une base de données de séries temporelles.

How Scalingo makes migrating from Heroku easy

Après la publication de notre addon InfluxDB, vous vous demandez peut-être si une base de données de séries temporelles peut correspondre à vos besoins. Dans cet article de blog, nous allons expliquer un exemple d'utilisation typique d'une base de données de séries temporelles.

Un exemple d'utilisation

Voyons un exemple simple d'application utilisant InfluxDB comme base de données de séries temporelles. Cette application surveille un hashtag sur Twitter et compte combien de tweets sont publiés avec ce hashtag chaque minute. Les résultats sont affichés sur un graphique disponible ici.

Dans cet exemple, différentes technologies sont utilisées. Le backend est construit avec :

  • La merveilleuse base de données de séries temporelles InfluxDB pour stocker un nouveau point de données pour chaque tweet contenant le hashtag surveillé

  • Un worker Go qui utilise la go-twitter bibliothèque pour s'abonner à un flux public Twitter.

  • Un serveur web Go utilisant le framework Martini.

Le frontend utilise les technologies suivantes :

  • JavaScript pur pour afficher les résultats sur un graphique généré à l'aide de la bibliothèque C3.js.

Vous pouvez voir l'application ici et lire le code sur GitHub. Voici une capture d'écran de cet exemple :



Screenshot of the InfluxDB sample



Comment InfluxDB est utilisé dans cet exemple

Les données dans InfluxDB sont organisées par séries temporelles, chacune d'elles contenant une valeur mesurée. Dans cet exemple, la mesure est le nombre de tweets, les tags sont le type de tweet (c'est-à-dire tweet, DM ou événement) et le hashtag (afin que nous puissions surveiller plusieurs hashtags), et le seul champ est la mesure.

Chaque fois qu'un tweet est publié contenant le hashtag surveillé, nous ajoutons un point de données à InfluxDB en utilisant la date de création du tweet fournie par l'API Twitter comme horodatage. Cette partie est expliquée plus en détail dans la section suivante.

Maintenant, dans le frontend, nous voulons afficher le nombre de tweets chaque minute au cours de la dernière heure. Pour récupérer des données de la base de données, InfluxDB fournit InfluxQL, un langage semblable à SQL. Dans notre cas, la requête est facilement interprétée par quiconque comprenant SQL :

SELECT sum(value) FROM tweets WHERE hashtag = 'RT' AND time >= now() - 1h GROUP BY time(1m) ORDER BY time DESC LIMIT 60
SELECT sum(value) FROM tweets WHERE hashtag = 'RT' AND time >= now() - 1h GROUP BY time(1m) ORDER BY time DESC LIMIT 60
SELECT sum(value) FROM tweets WHERE hashtag = 'RT' AND time >= now() - 1h GROUP BY time(1m) ORDER BY time DESC LIMIT 60
SELECT sum(value) FROM tweets WHERE hashtag = 'RT' AND time >= now() - 1h GROUP BY time(1m) ORDER BY time DESC LIMIT 60
  • value contient le nombre de tweets avec le hashtag spécifié chaque seconde.

  • WHERE hashtag = 'RT' AND time >= now() - 1h filtre le hashtag spécifié et uniquement pour l'heure passée.

  • GROUP BY time(1m) détermine comment InfluxDB va regrouper les résultats de la requête dans le temps. Ici, nous avons la somme des valeurs pour chaque minute. Plus d'informations dans la documentation d'InfluxDB.

  • ORDER BY time DESC LIMIT 60 pour obtenir les 60 derniers résultats. Avec le précédent GROUP BY, cela garantit que nous obtenons les résultats pour l'heure passée.

Vous pouvez exécuter cette requête en utilisant la console InfluxDB. Vous pouvez obtenir cette console avec le Scaling CLI :

scalingo --app sample-influxdb influxdb-console
scalingo --app sample-influxdb influxdb-console
scalingo --app sample-influxdb influxdb-console
scalingo --app sample-influxdb influxdb-console

InfluxDB fournit également un moyen de supprimer automatiquement les enregistrements plus anciens qu'une certaine date. Comme nous n'avons pas besoin des données plus anciennes qu'une heure, nous créons une politique de rétention par défaut d'une heure :

CREATE RETENTION POLICY "one_hour" ON "tweets" DURATION 1h REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "one_hour" ON "tweets" DURATION 1h REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "one_hour" ON "tweets" DURATION 1h REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "one_hour" ON "tweets" DURATION 1h REPLICATION 1 DEFAULT

Créer une politique de rétention n'est pas disponible pour les utilisateurs de base de données simples. Si vous souhaitez en créer une, vous pouvez vous connecter à votre tableau de bord de base de données et aller à l'onglet "Avancé".



Advanced tab of the database dashboard



Plus d'informations sur le downsampling et la rétention des données sont disponibles dans la documentation.

La partie Go de l'exemple

InfluxDB fournit une bibliothèque officielle pour interagir avec un serveur InfluxDB. Nous allons montrer comment nous l'avons utilisée dans le contexte de cet exemple. En utilisant cette bibliothèque, les données de séries temporelles sont appelées points. Ces points sont écrits dans la base de données à l'aide d'inserts par lots.

Chaque fois qu'un tweet est publié en utilisant le hashtag surveillé, nous incrémentons un compteur et ajoutons ou mettons à jour un point dans la base de données. Le snippet suivant montre la fonction appelée chaque fois qu'un tweet est publié. Ce code appartient au worker de notre exemple.

Pour plus de clarté, nous avons sauté tous les contrôles d'erreur dans ces extraits de code. N'oubliez pas de les ajouter dans votre code de production...

func addTweet(createdAt, type string) {
  bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
    Database:  config.InfluxConnectionInformation.Database,
    Precision: "s",
  })

  pt, _ := influx.NewPoint(
    "tweets",
    map[string]string{"type": type, "hashtag": config.E["HASHTAG"]},
    map[string]interface{}{"value": nbTweetsCurrentSecond},
    createdAt
  )
  bp.AddPoint(pt)

  client, _ := influx.NewHTTPClient(influx.HTTPConfig{
    Addr:     config.InfluxConnectionInformation.Host,
    Username: config.InfluxConnectionInformation.User,
    Password: config.InfluxConnectionInformation.Password,
  })
  defer client.Close()

  client.Write(bp

func addTweet(createdAt, type string) {
  bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
    Database:  config.InfluxConnectionInformation.Database,
    Precision: "s",
  })

  pt, _ := influx.NewPoint(
    "tweets",
    map[string]string{"type": type, "hashtag": config.E["HASHTAG"]},
    map[string]interface{}{"value": nbTweetsCurrentSecond},
    createdAt
  )
  bp.AddPoint(pt)

  client, _ := influx.NewHTTPClient(influx.HTTPConfig{
    Addr:     config.InfluxConnectionInformation.Host,
    Username: config.InfluxConnectionInformation.User,
    Password: config.InfluxConnectionInformation.Password,
  })
  defer client.Close()

  client.Write(bp

func addTweet(createdAt, type string) {
  bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
    Database:  config.InfluxConnectionInformation.Database,
    Precision: "s",
  })

  pt, _ := influx.NewPoint(
    "tweets",
    map[string]string{"type": type, "hashtag": config.E["HASHTAG"]},
    map[string]interface{}{"value": nbTweetsCurrentSecond},
    createdAt
  )
  bp.AddPoint(pt)

  client, _ := influx.NewHTTPClient(influx.HTTPConfig{
    Addr:     config.InfluxConnectionInformation.Host,
    Username: config.InfluxConnectionInformation.User,
    Password: config.InfluxConnectionInformation.Password,
  })
  defer client.Close()

  client.Write(bp

func addTweet(createdAt, type string) {
  bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
    Database:  config.InfluxConnectionInformation.Database,
    Precision: "s",
  })

  pt, _ := influx.NewPoint(
    "tweets",
    map[string]string{"type": type, "hashtag": config.E["HASHTAG"]},
    map[string]interface{}{"value": nbTweetsCurrentSecond},
    createdAt
  )
  bp.AddPoint(pt)

  client, _ := influx.NewHTTPClient(influx.HTTPConfig{
    Addr:     config.InfluxConnectionInformation.Host,
    Username: config.InfluxConnectionInformation.User,
    Password: config.InfluxConnectionInformation.Password,
  })
  defer client.Close()

  client.Write(bp

La variable config.InfluxConnectionInformation est de type *InfluxInfo. Nous l'avons remplie en analysant la variable d'environnement SCALINGO_INFLUX_URL avec la fonction suivante :

func parseConnectionString(con string) (*InfluxInfo, error) {
        url, err := url.Parse(con)
        if err != nil {
                return nil, errgo.Mask(err)
        }

        var password, username string
        if url.User != nil {
                password, _ = url.User.Password()
                username = url.User.Username()
        }

        return &InfluxInfo{
                Host:             url.Scheme + "://" + url.Host,
                User:             username,
                Password:         password,
                Database:         url.Path[1:],
                ConnectionString: con,
        }, nil

func parseConnectionString(con string) (*InfluxInfo, error) {
        url, err := url.Parse(con)
        if err != nil {
                return nil, errgo.Mask(err)
        }

        var password, username string
        if url.User != nil {
                password, _ = url.User.Password()
                username = url.User.Username()
        }

        return &InfluxInfo{
                Host:             url.Scheme + "://" + url.Host,
                User:             username,
                Password:         password,
                Database:         url.Path[1:],
                ConnectionString: con,
        }, nil

func parseConnectionString(con string) (*InfluxInfo, error) {
        url, err := url.Parse(con)
        if err != nil {
                return nil, errgo.Mask(err)
        }

        var password, username string
        if url.User != nil {
                password, _ = url.User.Password()
                username = url.User.Username()
        }

        return &InfluxInfo{
                Host:             url.Scheme + "://" + url.Host,
                User:             username,
                Password:         password,
                Database:         url.Path[1:],
                ConnectionString: con,
        }, nil

func parseConnectionString(con string) (*InfluxInfo, error) {
        url, err := url.Parse(con)
        if err != nil {
                return nil, errgo.Mask(err)
        }

        var password, username string
        if url.User != nil {
                password, _ = url.User.Password()
                username = url.User.Username()
        }

        return &InfluxInfo{
                Host:             url.Scheme + "://" + url.Host,
                User:             username,
                Password:         password,
                Database:         url.Path[1:],
                ConnectionString: con,
        }, nil

Enfin, le frontend de notre application a besoin d'un moyen pour lire les données de la base de données. Nous avons ajouté un endpoint GET /tweets dans le serveur web qui retourne le nombre de tweets chaque minute pour la dernière heure. La réponse est encodée en JSON pour s'intégrer parfaitement avec la bibliothèque JavaScript que nous utilisons.

Ce point de terminaison construit et exécute la requête InfluxQL :

func LastMinutesTweets() ([]InfluxValue, error) {
        queryString := "SELECT SUM('value') FROM 'tweets'"
        queryString += " WHERE hashtag = '" + config.E["HASHTAG"] + "'"
        queryString += " AND time >= now() - 60m"
        queryString += " GROUP BY time(1m) fill(none) ORDER BY time DESC LIMIT 60"

        return executeQuery(queryString)
}

func executeQuery(queryString string) ([]InfluxValue, error) {
  client, _ := influx.NewHTTPClient(influx.HTTPConfig{
    Addr:     config.InfluxConnectionInformation.Host,
    Username: config.InfluxConnectionInformation.User,
    Password: config.InfluxConnectionInformation.Password,
  })
  defer client.Close()

  response, _ := client.Query(influx.Query{
    Command:  queryString,
    Database: config.InfluxConnectionInformation.Database,
  })

  if response.Error() != nil {
    panic(response.Error())
  }

  return convertResults(response

func LastMinutesTweets() ([]InfluxValue, error) {
        queryString := "SELECT SUM('value') FROM 'tweets'"
        queryString += " WHERE hashtag = '" + config.E["HASHTAG"] + "'"
        queryString += " AND time >= now() - 60m"
        queryString += " GROUP BY time(1m) fill(none) ORDER BY time DESC LIMIT 60"

        return executeQuery(queryString)
}

func executeQuery(queryString string) ([]InfluxValue, error) {
  client, _ := influx.NewHTTPClient(influx.HTTPConfig{
    Addr:     config.InfluxConnectionInformation.Host,
    Username: config.InfluxConnectionInformation.User,
    Password: config.InfluxConnectionInformation.Password,
  })
  defer client.Close()

  response, _ := client.Query(influx.Query{
    Command:  queryString,
    Database: config.InfluxConnectionInformation.Database,
  })

  if response.Error() != nil {
    panic(response.Error())
  }

  return convertResults(response

func LastMinutesTweets() ([]InfluxValue, error) {
        queryString := "SELECT SUM('value') FROM 'tweets'"
        queryString += " WHERE hashtag = '" + config.E["HASHTAG"] + "'"
        queryString += " AND time >= now() - 60m"
        queryString += " GROUP BY time(1m) fill(none) ORDER BY time DESC LIMIT 60"

        return executeQuery(queryString)
}

func executeQuery(queryString string) ([]InfluxValue, error) {
  client, _ := influx.NewHTTPClient(influx.HTTPConfig{
    Addr:     config.InfluxConnectionInformation.Host,
    Username: config.InfluxConnectionInformation.User,
    Password: config.InfluxConnectionInformation.Password,
  })
  defer client.Close()

  response, _ := client.Query(influx.Query{
    Command:  queryString,
    Database: config.InfluxConnectionInformation.Database,
  })

  if response.Error() != nil {
    panic(response.Error())
  }

  return convertResults(response

func LastMinutesTweets() ([]InfluxValue, error) {
        queryString := "SELECT SUM('value') FROM 'tweets'"
        queryString += " WHERE hashtag = '" + config.E["HASHTAG"] + "'"
        queryString += " AND time >= now() - 60m"
        queryString += " GROUP BY time(1m) fill(none) ORDER BY time DESC LIMIT 60"

        return executeQuery(queryString)
}

func executeQuery(queryString string) ([]InfluxValue, error) {
  client, _ := influx.NewHTTPClient(influx.HTTPConfig{
    Addr:     config.InfluxConnectionInformation.Host,
    Username: config.InfluxConnectionInformation.User,
    Password: config.InfluxConnectionInformation.Password,
  })
  defer client.Close()

  response, _ := client.Query(influx.Query{
    Command:  queryString,
    Database: config.InfluxConnectionInformation.Database,
  })

  if response.Error() != nil {
    panic(response.Error())
  }

  return convertResults(response

Avec la puissance de Go et InfluxDB combinées, nous montrons à quel point il est facile de gérer des données de séries temporelles et de les afficher en temps réel sur une page web. En utilisant Scalingo, une telle application peut être mise en ligne en quelques minutes.

Etienne Michon, Scalingo

Étienne Michon

Docteur en informatique, Étienne Michon occupe actuellement le poste d'ingénieur R&D chez Scalingo. Il était l'un des premiers employés de Scalingo et il contribue grandement à faire grandir ce blog grâce à ses articles techniques de qualité.

Restez informé

Recevez des articles et des mises à jour de la plateforme dans votre boîte de réception.

Prêt à déployer en toute confiance ?

Découvrez des déploiements sans temps d'arrêt, une mise à l'échelle automatique intelligente et une infrastructure entièrement gérée. Commencez à déployer vos applications sur Scalingo dès aujourd'hui.

Aucune carte de crédit requise • Déployez en quelques minutes • Annulez à tout moment

Dégradé arrière-plan section

Déployez une application ou base de données

Commencez à déployer

Rejoignez les équipes qui misent sur une plateforme conçue pour livrer rapidement, opérer sereinement, avec des valeurs européennes et un support humain.

Dégradé arrière-plan section

Déployez une application ou base de données

Commencez à déployer

Rejoignez les équipes qui misent sur une plateforme conçue pour livrer rapidement, opérer sereinement, avec des valeurs européennes et un support humain.

Dégradé arrière-plan section

Déployez une application ou base de données

Commencez à déployer

Rejoignez les équipes qui misent sur une plateforme conçue pour livrer rapidement, opérer sereinement, avec des valeurs européennes et un support humain.